[GitHub] [kafka] cmccabe opened a new pull request, #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-12 Thread via GitHub


cmccabe opened a new pull request, #14010:
URL: https://github.com/apache/kafka/pull/14010

   Implement some of the metrics from KIP-938: Add more metrics for measuring 
KRaft performance.
   
   Add these metrics to QuorumControllerMetrics:
   kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount
   
kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount
   
kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount
   kafka.controller:type=KafkaController,name=NewActiveControllersCount
   
   Create LoaderMetrics with these new metrics:
   kafka.server:type=MetadataLoader,name=CurrentMetadataVersion
   kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount
   
   Create SnapshotEmitterMetrics with these new metrics:
   kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes
   kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-07-12 Thread via GitHub


vamossagar12 commented on PR #11433:
URL: https://github.com/apache/kafka/pull/11433#issuecomment-1633559424

   @mjsax thanks for notifying and that's ok :D Even I am not too up to date 
with the latest developments on Streams so in a way I dont think I was able to 
do full justification to this. Thanks for all the review and help on this  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #14003: KAFKA-15182: Normalize source connector offsets before invoking SourceConnector::alterOffsets

2023-07-12 Thread via GitHub


yashmayya commented on code in PR #14003:
URL: https://github.com/apache/kafka/pull/14003#discussion_r1261985349


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1608,6 +1612,32 @@ void modifySourceConnectorOffsets(String connName, 
Connector connector, Map
+ * Visible for testing.
+ *
+ * @param originalOffsets the offsets that are to be normalized
+ * @return the normalized offsets
+ */
+@SuppressWarnings("unchecked")
+Map, Map> 
normalizeSourceConnectorOffsets(Map, Map> 
originalOffsets) {
+Map, Map> normalizedOffsets = new 
HashMap<>();
+for (Map.Entry, Map> entry : 
originalOffsets.entrySet()) {
+OffsetUtils.validateFormat(entry.getKey());
+OffsetUtils.validateFormat(entry.getValue());
+byte[] serializedKey = internalKeyConverter.fromConnectData("", 
null, entry.getKey());

Review Comment:
   This should be safe to do because the `OffsetStorageReaderImpl` also 
serializes the connector / task specified source partition before retrieving 
its corresponding source offset. The difference here is that there is an extra 
ser / deser hop although that shouldn't cause issues. So, for instance:
   
   ```
   Map p1 = Collections.singletonMap("partition_key", 10);
   Map p2 = Collections.singletonMap("partition_key", 10L);
   
   ByteBuffer serializedP1 = ByteBuffer.wrap(converter.fromConnectData("", 
null, p1));
   ByteBuffer serializedP2 = ByteBuffer.wrap(converter.fromConnectData("", 
null, p2));
   
   assertTrue(serializedP1.equals(serializedP2));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-12 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1261948519


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   > Hmmm... wouldn't that be a pretty serious breaking change if we 
accidentally switched up how the JSON converter deserializes integer types? Not 
just for the file source connector, but for plenty of others.
   
   Okay, that's fair enough, I've changed the check in 
`FileStreamSourceConnector::alterOffsets` to mirror the one made in the task at 
startup for consistency (and avoided making changes in the existing task 
logic). This does mean that this PR should be merged after 
https://github.com/apache/kafka/pull/14003 has been merged (assuming that that 
approach is acceptable).
   
   > I don't know if this significantly changes the conversation but it seems 
subtle and counterintuitive enough to bring up so that we can avoid 
accidentally breaking connector code that relies on this behavior.
   
   Hm yeah, that's definitely another interesting one to bring up - however, 
I'd contend that that one kinda makes sense since we're passing the 
`SourceRecord` itself - tasks already deal with `SourceRecord` and their 
offsets (and associated types) in their regular lifecycle. It would be highly 
confusing if the `SourceRecord` that they get in `commitRecord` doesn't match 
the one they dispatched to the framework via `poll`. Of course, ideally, the 
offsets that they read via `OffsetStorageReader` should also not have any type 
mismatches compared to the `SourceRecord` ones, but I don't think we'd want to 
(or safely could) change that at this point.
   
   Since the offsets being altered externally would correspond to the ones that 
the connector / tasks read at startup, I think it makes sense to align the 
types across invocations to `SourceConnector::alterOffsets` and offsets queried 
from an `OffsetStorageReader` (and an implicit separate alignment between the 
`SourceRecord`'s offsets types).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-12 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1261948519


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   > Hmmm... wouldn't that be a pretty serious breaking change if we 
accidentally switched up how the JSON converter deserializes integer types? Not 
just for the file source connector, but for plenty of others.
   
   Okay, that's fair enough, I've changed the check in 
`FileStreamSourceConnector::alterOffsets` to mirror the one made in the task at 
startup for consistency (and avoided making changes in the existing task logic).
   
   > I don't know if this significantly changes the conversation but it seems 
subtle and counterintuitive enough to bring up so that we can avoid 
accidentally breaking connector code that relies on this behavior.
   
   Hm yeah, that's definitely another interesting one to bring up - however, 
I'd contend that that one kinda makes sense since we're passing the 
`SourceRecord` itself - tasks already deal with `SourceRecord` and their 
offsets (and associated types) in their regular lifecycle. It would be highly 
confusing if the `SourceRecord` that they get in `commitRecord` doesn't match 
the one they dispatched to the framework via `poll`. Of course, ideally, the 
offsets that they read via `OffsetStorageReader` should also not have any type 
mismatches compared to the `SourceRecord` ones, but I don't think we'd want to 
(or safely could) change that at this point.
   
   Since the offsets being altered externally would correspond to the ones that 
the connector / tasks read at startup, I think it makes sense to align the 
types across invocations to `SourceConnector::alterOffsets` and offsets queried 
from an `OffsetStorageReader` (and an implicit separate alignment between the 
`SourceRecord`'s offsets types).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.

2023-07-12 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742624#comment-17742624
 ] 

hudeqi commented on KAFKA-14912:


Hi [~divijvaidya] [~manyanda] I am also interested, if you are busy later, I 
can provide alternative support.:D

> Introduce a configuration for remote index cache size, preferably a dynamic 
> config.
> ---
>
> Key: KAFKA-14912
> URL: https://issues.apache.org/jira/browse/KAFKA-14912
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Manyanda Chitimbo
>Priority: Major
>
> Context: We need to make the 1024 value here [1] as dynamically configurable
> [1] 
> https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on a diff in pull request #13996: KAFKA-15022: [2/N] introduce graph to compute min cost

2023-07-12 Thread via GitHub


mjsax commented on code in PR #13996:
URL: https://github.com/apache/kafka/pull/13996#discussion_r1261835084


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java:
##
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+public class Graph> {
+public class Edge implements Comparable {
+final V destination;
+final int capacity;
+final int cost;
+int residualFlow;
+int flow;
+Edge counterEdge;
+
+public Edge(final V destination, final int capacity, final int cost, 
final int residualFlow, final int flow) {
+Objects.requireNonNull(destination);
+this.destination = destination;
+this.capacity = capacity;
+this.cost = cost;
+this.residualFlow = residualFlow;
+this.flow = flow;
+}
+
+@Override
+public int compareTo(final Edge o) {

Review Comment:
   `compareTo` is to establish an order, right? Why do we order by 
`(destination,capacity,cost)`; does is matter, or could we use any order as 
long as deterministic?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java:
##
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+public class Graph> {
+public class Edge implements Comparable {
+final V destination;
+final int capacity;
+final int cost;
+int residualFlow;
+int flow;
+Edge counterEdge;
+
+public Edge(final V destination, final int capacity, final int cost, 
final int residualFlow, final int flow) {
+Objects.requireNonNull(destination);
+this.destination = destination;
+this.capacity = capacity;
+this.cost = cost;
+this.residualFlow = residualFlow;
+this.flow = flow;
+}
+
+@Override
+public int compareTo(final Edge o) {
+int compare = destination.compareTo(o.destination);
+if (compare != 0) {
+return compare;
+}
+
+compare = capacity - o.capacity;
+if (compare != 0) {
+return compare;
+}
+
+return cost - o.cost;
+}
+
+@Override
+public boolean equals(final Object other) {
+if (this == other) {
+return true;
+}
+if (other == null || other.getClass() != getClass()) {
+return false;
+}
+
+final Graph.Edge otherEdge = (Graph.Edge) other;
+
+return destination.equals(otherEdge.destination) && capacity == 
otherEdge.capacity
+

[GitHub] [kafka] hni61223 opened a new pull request, #14009: MINOR: Add dual write offset metric

2023-07-12 Thread via GitHub


hni61223 opened a new pull request, #14009:
URL: https://github.com/apache/kafka/pull/14009

   - Add dual write offset metric for ZK migration
   - Tested in jconsole
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15169) Add tests for RemoteIndexCache

2023-07-12 Thread Lan Ding (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742602#comment-17742602
 ] 

Lan Ding commented on KAFKA-15169:
--

Sure, I will add tests for RemoteIndexCache in days.

> Add tests for RemoteIndexCache
> --
>
> Key: KAFKA-15169
> URL: https://issues.apache.org/jira/browse/KAFKA-15169
> Project: Kafka
>  Issue Type: Test
>Reporter: Satish Duggana
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.6.0
>
>
> Follow-up from 
> https://github.com/apache/kafka/pull/13275#discussion_r1257490978



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15169) Add tests for RemoteIndexCache

2023-07-12 Thread Lan Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lan Ding reassigned KAFKA-15169:


Assignee: Lan Ding

> Add tests for RemoteIndexCache
> --
>
> Key: KAFKA-15169
> URL: https://issues.apache.org/jira/browse/KAFKA-15169
> Project: Kafka
>  Issue Type: Test
>Reporter: Satish Duggana
>Assignee: Lan Ding
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.6.0
>
>
> Follow-up from 
> https://github.com/apache/kafka/pull/13275#discussion_r1257490978



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] junrao commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-07-12 Thread via GitHub


junrao commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1261801130


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -500,11 +504,13 @@ class ReplicaManager(val config: KafkaConfig,
   // Delete log and corresponding folders in case replica manager 
doesn't hold them anymore.
   // This could happen when topic is being deleted while broker is 
down and recovers.
   stoppedPartitions += topicPartition -> deletePartition
+  if (remoteLogManager.isDefined)
+partitionsMaybeToDeleteRemote += topicPartition

Review Comment:
   Hmm, it seems this case can occur during partition reassignment. In that 
case, we don't want to delete the remote data, right?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 }
 }
 
+public void cleanupDeletedRemoteLogSegments() {

Review Comment:
   This process runs every replica. So, we will be deleting the same remote 
segment multiple times?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -526,14 +532,17 @@ class ReplicaManager(val config: KafkaConfig,
   /**
* Stop the given partitions.
*
-   * @param partitionsToStopA map from a topic partition to a boolean 
indicating
-   *whether the partition should be deleted.
+   * @param partitionsToStopA map from a topic partition to a 
boolean indicating
+   *whether the partition should be 
deleted.
+   * @param partitionsMaybeToDeleteRemote   A set of topic partitions that may 
need to delete
+   *remote segments.
*
-   * @returnA map from partitions to exceptions which 
occurred.
-   *If no errors occurred, the map will be empty.
+   * @returnA map from partitions to 
exceptions which occurred.
+   *If no errors occurred, the map 
will be empty.
*/
   protected def stopPartitions(
-partitionsToStop: Map[TopicPartition, Boolean]
+partitionsToStop: Map[TopicPartition, Boolean],

Review Comment:
   It seems that the implementation doesn't support KRaft controller. Do we 
plan to support that for the 3.6.0 release?



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1159,6 +1161,9 @@ class LogManager(logDirs: Seq[File],
   checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
 }
 addLogToBeDeleted(removedLog)
+if (deleteRemote && removedLog.remoteLogEnabled())
+  RemoteLogManager.addTopicIdToBeDeleted(removedLog.topicIdAsJava)

Review Comment:
   LogManager only manages local data. So, it's a bit weird to have it call 
RemoteLogManager.



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 }
 }
 
+public void cleanupDeletedRemoteLogSegments() {
+if (isCancelled())
+return;
+
+Uuid topicId = topicIdPartition.topicId();
+if (deletedTopicIds.contains(topicId)) {
+cleanupAllRemoteLogSegments();
+cancelRLMtask();
+deletedTopicIds.remove(topicId);
+}
+}
+
+private void cleanupAllRemoteLogSegments() {

Review Comment:
   Since this runs asynchronously after topic deletion completes, if every 
replica is restarted before all remote segments are deleted, we will never be 
able to remove the remaining remote segments for the deleted topics?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax closed pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-07-12 Thread via GitHub


mjsax closed pull request #11433: KAFKA-13295: Avoiding Transation timeouts 
arising due to long restora…
URL: https://github.com/apache/kafka/pull/11433


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2023-07-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-13295.
-
Resolution: Fixed

With the new restore-thread, this issue should be resolved implicilty.

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15184) New consumer internals refactoring and clean up

2023-07-12 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15184:
--
Description: Minor refactoring of the new consumer internals including 
introduction of the {{RequestManagers}} class to hold references to the 
{{RequestManager}} instances.  (was: Minor refactoring of the new consumer 
internals including introduction of the RequestManagers class to hold the 
various RequestManager instances.)

> New consumer internals refactoring and clean up
> ---
>
> Key: KAFKA-15184
> URL: https://issues.apache.org/jira/browse/KAFKA-15184
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-945
>
> Minor refactoring of the new consumer internals including introduction of the 
> {{RequestManagers}} class to hold references to the {{RequestManager}} 
> instances.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hni61223 closed pull request #14007: MINOR: Add metrics for ZK migration

2023-07-12 Thread via GitHub


hni61223 closed pull request #14007: MINOR: Add metrics for ZK migration
URL: https://github.com/apache/kafka/pull/14007


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15184) New consumer internals refactoring and clean up

2023-07-12 Thread Kirk True (Jira)
Kirk True created KAFKA-15184:
-

 Summary: New consumer internals refactoring and clean up
 Key: KAFKA-15184
 URL: https://issues.apache.org/jira/browse/KAFKA-15184
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Kirk True
Assignee: Kirk True


Minor refactoring of the new consumer internals including introduction of the 
RequestManagers class to hold the various RequestManager instances.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()

2023-07-12 Thread via GitHub


philipnee commented on code in PR #13797:
URL: https://github.com/apache/kafka/pull/13797#discussion_r1261798307


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -522,7 +525,35 @@ public void subscribe(Collection topics, 
ConsumerRebalanceListener callb
 
 @Override
 public void assign(Collection partitions) {
-throw new KafkaException("method not implemented");
+if (partitions == null) {
+throw new IllegalArgumentException("Topic partitions collection to 
assign to cannot be null");
+}
+
+if (partitions.isEmpty()) {
+this.unsubscribe();
+return;
+}
+
+for (TopicPartition tp : partitions) {
+String topic = (tp != null) ? tp.topic() : null;
+if (Utils.isBlank(topic))
+throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
+}
+// TODO: implement fetcher
+// fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+
+// make sure the offsets of topic partitions the consumer is 
unsubscribing from
+// are committed since there will be no following rebalance
+commit(subscriptions.allConsumed());

Review Comment:
   Hey @junrao - Sorry I misunderstood your concern.  To your question: If the 
coordinator is not available, the commit request manager won't be built and 
this commit() will be skipped.  In the `RequestManagers.java` we've got
   ```
   if (groupRebalanceConfig != null && groupRebalanceConfig.groupId != null) {
   final GroupState groupState = new 
GroupState(groupRebalanceConfig);
   coordinator = new CoordinatorRequestManager(time,
   logContext,
   retryBackoffMs,
   errorEventHandler,
   groupState.groupId);
   commit = new CommitRequestManager(time, logContext, 
subscriptions, config, coordinator, groupState);
   }
   ```
   
   to your second question: autoCommit is still guarded by the config per this 
line in `CommitRequestManager.java`
   ```
   private void maybeAutoCommit() {
  if (!autoCommitState.isPresent()) {
   return;
   }
  // autocommit otherwise
   }
   ```
   
   On the Consumer API level, we treat sync/async commit to be the same, except 
sync commit waits for the completion of the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #1864: KAFKA-4177: Remove ThrottledReplicationRateLimit from Server Config

2023-07-12 Thread via GitHub


junrao commented on PR #1864:
URL: https://github.com/apache/kafka/pull/1864#issuecomment-1633289594

   @nkostoulas : Since this requires a config change, it will need a KIP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()

2023-07-12 Thread via GitHub


junrao commented on code in PR #13797:
URL: https://github.com/apache/kafka/pull/13797#discussion_r1261763698


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -522,7 +525,35 @@ public void subscribe(Collection topics, 
ConsumerRebalanceListener callb
 
 @Override
 public void assign(Collection partitions) {
-throw new KafkaException("method not implemented");
+if (partitions == null) {
+throw new IllegalArgumentException("Topic partitions collection to 
assign to cannot be null");
+}
+
+if (partitions.isEmpty()) {
+this.unsubscribe();
+return;
+}
+
+for (TopicPartition tp : partitions) {
+String topic = (tp != null) ? tp.topic() : null;
+if (Utils.isBlank(topic))
+throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
+}
+// TODO: implement fetcher
+// fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+
+// make sure the offsets of topic partitions the consumer is 
unsubscribing from
+// are committed since there will be no following rebalance
+commit(subscriptions.allConsumed());

Review Comment:
   @philipnee: `KafkaConsumer.assign()` has the following code. 
   
   ```
   // make sure the offsets of topic partitions the consumer is 
unsubscribing from
   // are committed since there will be no following rebalance
   if (coordinator != null)
   
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
   ```
   
   1. In the old code, if the groupId is not specified, coordinator will be 
null and the offset commit will be skipped. In this PR, we always call offset 
commit.
   2. In the old code, the implementation of `maybeAutoCommitOffsetsAsync()` is 
guarded by the `ENABLE_AUTO_COMMIT_CONFIG` config. In this PR, there is no such 
guard.
   
   Are both changes expected?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261758965


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {
+private static final long WORK_TIMEOUT_MS = 200L;

Review Comment:
   Yeah. I got confused by this as well. That was my understanding. That the 
clock doesn't actually advance unless we expire an event. And we only advance 
it to that time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hni61223 opened a new pull request, #14007: KMETA-988: Add metrics for ZK migration

2023-07-12 Thread via GitHub


hni61223 opened a new pull request, #14007:
URL: https://github.com/apache/kafka/pull/14007

   - Add dual write offset metric for ZK migration
   - Tested in jconsole
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

2023-07-12 Thread via GitHub


ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1633242202

   @fvaleri, I worked a bit after your comments and still have a few questions.
   First, tell me, please, how you prepare your environment to run `$ 
bin/kafka-get-offsets.sh`. Is there any guide or quickstart about it? I don't 
know yet, how to test my editions in that file correctly. Then I need to know, 
which options are considered to be deprecated. It's for improvement you 
suggested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah closed pull request #14006: MINOR Add metrics for ZK migration

2023-07-12 Thread via GitHub


mumrah closed pull request #14006: MINOR Add metrics for ZK migration
URL: https://github.com/apache/kafka/pull/14006


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jeffkbkim commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261730522


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {
+private static final long WORK_TIMEOUT_MS = 200L;

Review Comment:
   to confirm, we don't actually advance the timer by 200ms every iteration but 
wait for a max 200ms if there are no tasks to expire.
   
   this is more of a question on system timer, but in 
`SystemTimer#advanceClock()`
   ```
   timingWheel.advanceClock(bucket.getExpiration());
   ```
   how is the timer in-sync with the actual time if we advance clock by a 
custom amount?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {
+private static final long WORK_TIMEOUT_MS = 200L;
+
+class Reaper extends ShutdownableThread {
+Reaper(String name) {
+super(name, false);
+}
+
+@Override
+public void doWork() {
+try {
+timer.advanceClock(WORK_TIMEOUT_MS);
+} catch (InterruptedException ex) {
+// Ignore.
+}
+}
+}
+
+private Timer timer;
+private Reaper reaper;

Review Comment:
   can these be final?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a 

[jira] [Comment Edited] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.

2023-07-12 Thread Manyanda Chitimbo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742554#comment-17742554
 ] 

Manyanda Chitimbo edited comment on KAFKA-14912 at 7/12/23 8:35 PM:


Hi [~divijvaidya] Yes, I'll be interested in picking this up. Thanks for 
suggesting it to me. I can start to have a look at it tomorrow. I'll re-assign 
the issue to myself. 


was (Author: JIRAUSER299903):
Hi [~divijvaidya] Yes, I'll be interested in picking this up. I can start to 
have a look at it tomorrow. I'll re-assign the issue to myself. 

> Introduce a configuration for remote index cache size, preferably a dynamic 
> config.
> ---
>
> Key: KAFKA-14912
> URL: https://issues.apache.org/jira/browse/KAFKA-14912
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> Context: We need to make the 1024 value here [1] as dynamically configurable
> [1] 
> https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.

2023-07-12 Thread Manyanda Chitimbo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manyanda Chitimbo reassigned KAFKA-14912:
-

Assignee: Manyanda Chitimbo  (was: Kamal Chandraprakash)

> Introduce a configuration for remote index cache size, preferably a dynamic 
> config.
> ---
>
> Key: KAFKA-14912
> URL: https://issues.apache.org/jira/browse/KAFKA-14912
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Manyanda Chitimbo
>Priority: Major
>
> Context: We need to make the 1024 value here [1] as dynamically configurable
> [1] 
> https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.

2023-07-12 Thread Manyanda Chitimbo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742554#comment-17742554
 ] 

Manyanda Chitimbo commented on KAFKA-14912:
---

Hi [~divijvaidya] Yes, I'll be interested in picking this up. I can start to 
have a look at it tomorrow. I'll re-assign the issue to myself. 

> Introduce a configuration for remote index cache size, preferably a dynamic 
> config.
> ---
>
> Key: KAFKA-14912
> URL: https://issues.apache.org/jira/browse/KAFKA-14912
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> Context: We need to make the 1024 value here [1] as dynamically configurable
> [1] 
> https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-12 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259040832


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -228,14 +261,21 @@ public List build(TopicsImage topicsImage) {
 
 static class GroupMetadataManagerTestContext {
 static class Builder {
-final private Time time = new MockTime();
 final private LogContext logContext = new LogContext();
 final private SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
 private MetadataImage metadataImage;
-private List assignors;

Review Comment:
   we get illegal state exception if it's not initialized and since it doesn't 
affect the old protocol i thought it best to initialize it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hni61223 opened a new pull request, #14006: KMETA-988: Add metrics for ZK migration

2023-07-12 Thread via GitHub


hni61223 opened a new pull request, #14006:
URL: https://github.com/apache/kafka/pull/14006

   - Add dual write offset metric for ZK migration
   - Tested in jconsole
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.

2023-07-12 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14912:
-
Description: 
Context: We need to make the 1024 value here [1] as dynamically configurable

[1] 
https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119

> Introduce a configuration for remote index cache size, preferably a dynamic 
> config.
> ---
>
> Key: KAFKA-14912
> URL: https://issues.apache.org/jira/browse/KAFKA-14912
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> Context: We need to make the 1024 value here [1] as dynamically configurable
> [1] 
> https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.

2023-07-12 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742550#comment-17742550
 ] 

Divij Vaidya commented on KAFKA-14912:
--

[~hudeqi] [~manyanda] [~ivanyu] are you folks interested in picking this one up?

> Introduce a configuration for remote index cache size, preferably a dynamic 
> config.
> ---
>
> Key: KAFKA-14912
> URL: https://issues.apache.org/jira/browse/KAFKA-14912
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-12 Thread via GitHub


gharris1727 commented on PR #13971:
URL: https://github.com/apache/kafka/pull/13971#issuecomment-1633140141

   @C0urante Could you review this? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime

2023-07-12 Thread via GitHub


gharris1727 commented on PR #13313:
URL: https://github.com/apache/kafka/pull/13313#issuecomment-1633138815

   Hey @ijuma Do you have the system test results for this branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-12 Thread via GitHub


C0urante commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1261646923


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   Hmmm... wouldn't that be a pretty serious breaking change if we accidentally 
switched up how the JSON converter deserializes integer types? Not just for the 
file source connector, but for plenty of others.
   
   It feels like it might be a better use of our time to make note of this 
possibility and ensure that we have sufficient unit testing in place to prevent 
that kind of regression (I suspect we already do but haven't verified this yet).
   
   Of course, because things aren't interesting enough already--it turns out 
that there's actually two different scenarios in which tasks observe offsets 
for their connector. The first, which we're all familiar with, is when they 
query them using an 
[OffsetStorageReader](https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReader.java),
 which in distributed mode reflects the contents of the offsets topic. The 
second is when 
[SourceTask::commitRecord](https://kafka.apache.org/35/javadoc/org/apache/kafka/connect/source/SourceTask.html#commitRecord(org.apache.kafka.connect.source.SourceRecord,org.apache.kafka.clients.producer.RecordMetadata))
 is invoked, which carries with it the just-ack'd `SourceRecord` instance 
originally provided by the task, including the original in-memory source 
partition and source offset, which may use types that get lost when written to 
and read back from the offsets topic.
   
   I don't know if this significantly changes the conversation but it seems 
subtle and counterintuitive enough to bring up so that we can avoid 
accidentally breaking connector code that relies on this behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-12 Thread via GitHub


C0urante commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1261646923


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   Hmmm... wouldn't that be a pretty serious breaking change if we accidentally 
switched up how the JSON converter deserializes integer types? Not just for the 
file source connector, but for plenty of others.
   
   It feels like it might be a better use of our time to make note of this 
possibility and ensure that we have sufficient unit testing in place to prevent 
that kind of regression (I suspect we already do but haven't verified this yet).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors

2023-07-12 Thread via GitHub


C0urante commented on PR #14005:
URL: https://github.com/apache/kafka/pull/14005#issuecomment-1633085269

   @yashmayya do you have time to take a look at this? Feel like it may be 
useful to you with the work we were discussing on 
https://github.com/apache/kafka/pull/13945 about how we want to set examples 
with our implementation of this API, and possibly modifying the types of the 
objects that the framework passes to the `alterOffsets` method.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 merged pull request #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector

2023-07-12 Thread via GitHub


gharris1727 merged PR #13992:
URL: https://github.com/apache/kafka/pull/13992


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors

2023-07-12 Thread via GitHub


C0urante commented on code in PR #14005:
URL: https://github.com/apache/kafka/pull/14005#discussion_r1261630589


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -597,7 +596,9 @@ private Set listPartitions(
 Admin admin,
 Collection topics
 ) throws TimeoutException, InterruptedException, ExecutionException {
-assertFalse("collection of topics may not be empty", topics.isEmpty());

Review Comment:
   Removed assertions from this API as it added an implicit dependency on JUnit 
4, which we don't use in the `:connect:mirror` module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector

2023-07-12 Thread via GitHub


gharris1727 commented on PR #13992:
URL: https://github.com/apache/kafka/pull/13992#issuecomment-1633079621

   The flaky CI failures look unrelated and the mirror unit tests pass locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante opened a new pull request, #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors

2023-07-12 Thread via GitHub


C0urante opened a new pull request, #14005:
URL: https://github.com/apache/kafka/pull/14005

   [Jira](https://issues.apache.org/jira/browse/KAFKA-15177), 
[KIP-875](https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect)
   
   
   ### Summary
   Implements the new `alterOffsets` method in all three MM2 connectors. The 
only connector that makes use of the Connect framework-managed offsets at 
runtime is the `MirrorSourceConnector`, but since the other two connectors also 
emit offsets with their records, we also allow users to add/remove offsets for 
them as well, as long as the offsets match the format of the ones that are 
emitted by the connectors.
   
   ### Integration testing
   An integration test case is added (or really, augmented) that focuses 
primarily on the `MirrorSourceConnector`. This case piggybacks off of the 
existing `MirrorConnectorsExactlyOnceIntegrationTest::testReplication` test 
case, which is done for a few reasons:
   - Every new case added to the `MirrorConnectorsIntegrationBaseTest` is run 
by that class and all of its subclasses (of which there are currently five), 
which can lead to blot in testing runtime
   - Graceful shutdown of MM2 tasks may not always occur in a timely fashion, 
which can cause them to emit stale offsets after they are supposed to have been 
stopped; running with exactly-once support eliminates this risk
   - Some subclasses of the `MirrorConnectorsIntegrationBaseTest` 
(specifically, the `MirrorConnectorsIntegrationTransactionsTest` suite) cause 
unpredictable behavior with offsets that can make it harder to verify an exact 
number of records in a replicated topic due to, e.g., control records
   
   Still, this is a little hacky. Happy to change things up if we think it's 
worth the work to find something cleaner.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph opened a new pull request, #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache

2023-07-12 Thread via GitHub


kamalcph opened a new pull request, #14004:
URL: https://github.com/apache/kafka/pull/14004

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager

2023-07-12 Thread via GitHub


kamalcph commented on code in PR #13837:
URL: https://github.com/apache/kafka/pull/13837#discussion_r1261602111


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local 
file system to store
+ * offloaded log segments and associated data.
+ * 
+ * Due to the consistency semantic of POSIX-compliant file systems, this 
remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the 
copy to the storage succeeded.
+ * 
+ * 
+ * In order to guarantee isolation, independence, reproducibility and 
consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via 
the storage 

[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager

2023-07-12 Thread via GitHub


kamalcph commented on code in PR #13837:
URL: https://github.com/apache/kafka/pull/13837#discussion_r1261600532


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageHistory.java:
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static java.util.Arrays.stream;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Accumulates and retains the interactions between brokers and {@link 
LocalTieredStorage} instances.
+ * These interactions are modelled via events of type {@link 
LocalTieredStorageEvent}.
+ *
+ * Events from an instance of storage are captured by the {@link 
LocalTieredStorageHistory} after
+ * {@link LocalTieredStorageHistory#listenTo(LocalTieredStorage)} is called.
+ */
+/* @ThreadSafe */
+public final class LocalTieredStorageHistory {

Review Comment:
   yes, you're right. History is maintained only in memory and to assert the 
interaction with the `RemoteStorageManager`. 
   
   No plans made so far to make it durable. This framework is mainly used to 
assert the functionality end-to-end instead of stress-test where there can be a 
huge number of events for a single test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager

2023-07-12 Thread via GitHub


kamalcph commented on code in PR #13837:
URL: https://github.com/apache/kafka/pull/13837#discussion_r1261592637


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local 
file system to store
+ * offloaded log segments and associated data.
+ * 
+ * Due to the consistency semantic of POSIX-compliant file systems, this 
remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the 
copy to the storage succeeded.
+ * 
+ * 
+ * In order to guarantee isolation, independence, reproducibility and 
consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via 
the storage 

[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager

2023-07-12 Thread via GitHub


kamalcph commented on code in PR #13837:
URL: https://github.com/apache/kafka/pull/13837#discussion_r1261590988


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local 
file system to store
+ * offloaded log segments and associated data.
+ * 
+ * Due to the consistency semantic of POSIX-compliant file systems, this 
remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the 
copy to the storage succeeded.
+ * 
+ * 
+ * In order to guarantee isolation, independence, reproducibility and 
consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via 
the storage 

[jira] [Updated] (KAFKA-14993) Improve TransactionIndex instance handling while copying to and fetching from RSM.

2023-07-12 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-14993:
-
Description: 
RSM should throw a ResourceNotFoundException if it does not have 
TransactionIndex. Currently, it expects an empty InputStream and creates an 
unnecessary file in the cache. This can be avoided by catching 
ResourceNotFoundException and not creating an instance. There are minor 
cleanups needed in RemoteIndexCache and other TransactionIndex usages.

Also, update the LocalTieredStorage, see 
[this|https://github.com/apache/kafka/pull/13837#discussion_r1258917584] 
comment.

  was:RSM should throw a ResourceNotFoundException if it does not have 
TransactionIndex. Currently, it expects an empty InputStream and creates an 
unnecessary file in the cache. This can be avoided by catching 
ResourceNotFoundException and not creating an instance. There are minor 
cleanups needed in RemoteIndexCache and other TransactionIndex usages.


> Improve TransactionIndex instance handling while copying to and fetching from 
> RSM.
> --
>
> Key: KAFKA-14993
> URL: https://issues.apache.org/jira/browse/KAFKA-14993
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> RSM should throw a ResourceNotFoundException if it does not have 
> TransactionIndex. Currently, it expects an empty InputStream and creates an 
> unnecessary file in the cache. This can be avoided by catching 
> ResourceNotFoundException and not creating an instance. There are minor 
> cleanups needed in RemoteIndexCache and other TransactionIndex usages.
> Also, update the LocalTieredStorage, see 
> [this|https://github.com/apache/kafka/pull/13837#discussion_r1258917584] 
> comment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager

2023-07-12 Thread via GitHub


kamalcph commented on code in PR #13837:
URL: https://github.com/apache/kafka/pull/13837#discussion_r1261588861


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local 
file system to store
+ * offloaded log segments and associated data.
+ * 
+ * Due to the consistency semantic of POSIX-compliant file systems, this 
remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the 
copy to the storage succeeded.
+ * 
+ * 
+ * In order to guarantee isolation, independence, reproducibility and 
consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via 
the storage 

[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager

2023-07-12 Thread via GitHub


kamalcph commented on code in PR #13837:
URL: https://github.com/apache/kafka/pull/13837#discussion_r1261587596


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local 
file system to store
+ * offloaded log segments and associated data.
+ * 
+ * Due to the consistency semantic of POSIX-compliant file systems, this 
remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the 
copy to the storage succeeded.
+ * 
+ * 
+ * In order to guarantee isolation, independence, reproducibility and 
consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via 
the storage 

[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261578373


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/util/SystemTimerReaperTest.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.util.timer.SystemTimer;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+public class SystemTimerReaperTest {
+private static class FutureTimerTask extends TimerTask {
+CompletableFuture future = new CompletableFuture<>();
+
+FutureTimerTask(long delayMs) {
+super(delayMs);
+}
+
+@Override
+public void run() {
+// We use org.apache.kafka.common.errors.TimeoutException to 
differentiate
+// from java.util.concurrent.TimeoutException.
+future.completeExceptionally(new TimeoutException(
+String.format("Future failed to be completed before timeout of 
%sMs ms was reached", delayMs)));
+}
+}
+
+private  CompletableFuture add(Timer timer, long delayMs) {
+FutureTimerTask task = new FutureTimerTask<>(delayMs);
+timer.add(task);
+return task.future;
+}
+
+@Test
+public void testReaper() throws Exception {
+Timer timer = new SystemTimerReaper("reaper", new 
SystemTimer("timer"));
+try {
+CompletableFuture t1 = add(timer, 100L);
+CompletableFuture t2 = add(timer, 200L);
+CompletableFuture t3 = add(timer, 300L);
+TestUtils.assertFutureThrows(t1, TimeoutException.class);

Review Comment:
   I'm a little confused at how we ensure that the given amount of time has 
passed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager

2023-07-12 Thread via GitHub


kamalcph commented on code in PR #13837:
URL: https://github.com/apache/kafka/pull/13837#discussion_r1261578158


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local 
file system to store
+ * offloaded log segments and associated data.
+ * 
+ * Due to the consistency semantic of POSIX-compliant file systems, this 
remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the 
copy to the storage succeeded.
+ * 
+ * 
+ * In order to guarantee isolation, independence, reproducibility and 
consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via 
the storage 

[jira] [Created] (KAFKA-15183) Add more controller, loader, snapshot emitter metrics

2023-07-12 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-15183:


 Summary: Add more controller, loader, snapshot emitter metrics
 Key: KAFKA-15183
 URL: https://issues.apache.org/jira/browse/KAFKA-15183
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe


Add the controller, loader, and snapshot emitter metrics from KIP-938.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager

2023-07-12 Thread via GitHub


kamalcph commented on code in PR #13837:
URL: https://github.com/apache/kafka/pull/13837#discussion_r1261567166


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local 
file system to store
+ * offloaded log segments and associated data.
+ * 
+ * Due to the consistency semantic of POSIX-compliant file systems, this 
remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the 
copy to the storage succeeded.
+ * 
+ * 
+ * In order to guarantee isolation, independence, reproducibility and 
consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via 
the storage 

[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261559993


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2360,6 +2360,19 @@ public void testGroupIdsByTopics() {
 assertEquals(Collections.emptySet(), 
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
 }
 
+@Test
+public void testOnNewMetadataImageWithEmptyDelta() {

Review Comment:
   This is the test for the ofNullable change? Do we also need to check we 
don't notify any groups?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager

2023-07-12 Thread via GitHub


kamalcph commented on code in PR #13837:
URL: https://github.com/apache/kafka/pull/13837#discussion_r1261560074


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local 
file system to store
+ * offloaded log segments and associated data.
+ * 
+ * Due to the consistency semantic of POSIX-compliant file systems, this 
remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the 
copy to the storage succeeded.
+ * 
+ * 
+ * In order to guarantee isolation, independence, reproducibility and 
consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via 
the storage 

[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261559993


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2360,6 +2360,19 @@ public void testGroupIdsByTopics() {
 assertEquals(Collections.emptySet(), 
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
 }
 
+@Test
+public void testOnNewMetadataImageWithEmptyDelta() {

Review Comment:
   This is the test for the ofNullable change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261554932


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {
+private static final long WORK_TIMEOUT_MS = 200L;
+
+class Reaper extends ShutdownableThread {
+Reaper(String name) {
+super(name, false);
+}
+
+@Override
+public void doWork() {
+try {
+timer.advanceClock(WORK_TIMEOUT_MS);

Review Comment:
   How did we decide upon 200?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261554932


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {
+private static final long WORK_TIMEOUT_MS = 200L;
+
+class Reaper extends ShutdownableThread {
+Reaper(String name) {
+super(name, false);
+}
+
+@Override
+public void doWork() {
+try {
+timer.advanceClock(WORK_TIMEOUT_MS);

Review Comment:
   How did we decide upon 200? Is my understanding correct that we only execute 
the event if it expiration is within 200 ms of the current time?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.

2023-07-12 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742508#comment-17742508
 ] 

Kamal Chandraprakash commented on KAFKA-14912:
--

I'm not working on this one currently. We can reassign it. 

> Introduce a configuration for remote index cache size, preferably a dynamic 
> config.
> ---
>
> Key: KAFKA-14912
> URL: https://issues.apache.org/jira/browse/KAFKA-14912
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #13997: KAFKA-15180: Generalize integration tests to change use of KafkaConsumer to Consumer

2023-07-12 Thread via GitHub


divijvaidya commented on PR #13997:
URL: https://github.com/apache/kafka/pull/13997#issuecomment-1632968446

   Unrelated test failures:
   ```
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13997/2/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationBaseTest/Build___JDK_11_and_Scala_2_13___testReplication__/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13997/2/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_11_and_Scala_2_13___testBalancePartitionLeaders__/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13997/2/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_11_and_Scala_2_13___testBalancePartitionLeaders___2/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13997/2/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_11_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/)
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplicateSourceDefault()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13997/2/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationBaseTest/Build___JDK_17_and_Scala_2_13___testReplicateSourceDefault__/)
   [Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.streams.integration.AdjustStreamThreadCountTest.testConcurrentlyAccessThreads()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13997/2/testReport/junit/org.apache.kafka.streams.integration/AdjustStreamThreadCountTest/Build___JDK_20_and_Scala_2_13___testConcurrentlyAccessThreads__/)
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13997/2/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders__/)
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13997/2/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders___2/)
   ```
   
   I will leave this PR around for some time in case someone wants to add 
additional comments. Will merge after 2-3 days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261521364


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {

Review Comment:
   We needed this because there wasn't a thread actually running the timer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-12 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1261515978


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   Since the type alignment issue seemed like a broader one (i.e. not scoped to 
the file connectors being touched here), I've created a separate 
[ticket](https://issues.apache.org/jira/browse/KAFKA-15182) and 
[PR](https://github.com/apache/kafka/pull/14003) for it.
   
   > it doesn't seem like a great endorsement of our API if we have to 
implement workarounds in the file connectors, which are the first example of 
the connector API that many developers see.
   
   I'd argue that it isn't really a workaround and that the current check 
itself is bad. If the (de)serialization happened to use `Integer` for values 
that fit in a 32 bit signed type (which would be perfectly valid and is exactly 
what we see currently before the values are passed through the converter), the 
current check in the `FileStreamSourceTask` would cause it to bail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya opened a new pull request, #14003: KAFKA-15182: Normalize source connector offsets before invoking SourceConnector::alterOffsets

2023-07-12 Thread via GitHub


yashmayya opened a new pull request, #14003:
URL: https://github.com/apache/kafka/pull/14003

   - From https://issues.apache.org/jira/browse/KAFKA-15182:
   
   >See discussion 
[here](https://github.com/apache/kafka/pull/13945#discussion_r1260946148)
   >
   > TLDR: When users attempt to externally modify source connector offsets via 
the `PATCH /offsets` endpoint (introduced in 
[KIP-875](https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect)),
 type mismatches can occur between offsets passed to 
`SourceConnector::alterOffsets` and the offsets that are retrieved by 
connectors / tasks via an instance of `OffsetStorageReader` after the offsets 
have been modified. In order to prevent this type mismatch that could lead to 
subtle bugs in connectors, we could serialize + deserialize the offsets using 
the worker's internal JSON converter before invoking 
`SourceConnector::alterOffsets`.
   
   - I've also added a small unit test, verified that the existing offsets API 
related integration tests are passing, and tested this patch out manually with 
the `FileStreamSourceConnector`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15182) Normalize offsets before invoking SourceConnector::alterOffsets

2023-07-12 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15182:
--

 Summary: Normalize offsets before invoking 
SourceConnector::alterOffsets
 Key: KAFKA-15182
 URL: https://issues.apache.org/jira/browse/KAFKA-15182
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


See discussion 
[here|https://github.com/apache/kafka/pull/13945#discussion_r1260946148]

 

TLDR: When users attempt to externally modify source connector offsets via the 
{{PATCH /offsets}} endpoint (introduced in 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]),
 type mismatches can occur between offsets passed to 
{{SourceConnector::alterOffsets}} and the offsets that are retrieved by 
connectors / tasks via an instance of {{OffsetStorageReader }}after the offsets 
have been modified. In order to prevent this type mismatch that could lead to 
subtle bugs in connectors, we could serialize + deserialize the offsets using 
the worker's internal JSON converter before invoking 
{{{}SourceConnector::alterOffsets{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on pull request #13853: KAFKA-15088: Fixing Incorrect Reference Usage in Connector State Changes

2023-07-12 Thread via GitHub


gharris1727 commented on PR #13853:
URL: https://github.com/apache/kafka/pull/13853#issuecomment-1632939211

   Hey @daehokimm Thanks for the PR!
   
   I think that there is some semantic difference between 
`AbstractStatus.State` and `ConnectorStatus.State`, and certainly between 
`ConnectorStatus.State` and `TaskStatus.State`. Unfortunately Java doesn't seem 
to think so: there's no difference in the type-checker to separate these 
different types. Without the type-checker to enforce this, I don't think we're 
going to catch all of the mistakes that already exist, or prevent new ones from 
being added.
   
   What do you think about refactoring this AbstractStatus.State into distinct 
enums? It would help us to find all of the other places that this typo may 
exist.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest

2023-07-12 Thread via GitHub


philipnee commented on PR #13956:
URL: https://github.com/apache/kafka/pull/13956#issuecomment-1632936434

   Hey @divijvaidya - I actually tried to fix this probably a year ago, and my 
guess was some of the consumer didn't close cleanly and caused leak.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261496901


##
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##
@@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
   }
 
-  @ClusterTest(serverProperties = Array(
+  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(

Review Comment:
   I guess we will do many of these test changes in a followup though?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261492024


##
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##
@@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
   }
 
-  @ClusterTest(serverProperties = Array(
+  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(

Review Comment:
   Hmmm does this also mean that we are no longer testing the old group 
coordinator (at least on this test)
   
   EDIT: I see the name of the test now  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261492024


##
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##
@@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
   }
 
-  @ClusterTest(serverProperties = Array(
+  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(

Review Comment:
   Hmmm does this also mean that we are no longer testing the old group 
coordinator (at least on this test)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1261475273


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -175,7 +176,7 @@ object Defaults {
   val ConsumerGroupMinHeartbeatIntervalMs = 5000
   val ConsumerGroupMaxHeartbeatIntervalMs = 15000
   val ConsumerGroupMaxSize = Int.MaxValue
-  val ConsumerGroupAssignors = ""
+  val ConsumerGroupAssignors = List(classOf[RangeAssignor].getName).asJava

Review Comment:
   Did we need to do this because the default couldn't be empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils

2023-07-12 Thread via GitHub


vamossagar12 commented on PR #13158:
URL: https://github.com/apache/kafka/pull/13158#issuecomment-1632883931

   > I took a look at the changes and they look fine. If I understand correctly 
we're moving these classes to server-common while we have to keep MirrorMaker. 
Then in Kafka 4.0 (and assuming 
[KAFKA-14525](https://issues.apache.org/jira/browse/KAFKA-14525) is complete) 
we will be able to move all these classes to tools. Is that right?
   
   Thanks @mimaison , yes that's the idea. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1261444204


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -3057,6 +3057,56 @@ public void 
testSenderShouldRetryWithBackoffOnRetriableError() {
 assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2);
 }
 
+@Test
+public void testReceiveFailedBatchTwiceWithTransactions() throws Exception 
{
+ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
+apiVersions.update("0", 
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
+TransactionManager txnManager = new TransactionManager(logContext, 
"testFailTwice", 6, 100, apiVersions);
+
+setupWithTransactionState(txnManager);
+doInitTransactions(txnManager, producerIdAndEpoch);
+
+txnManager.beginTransaction();
+txnManager.maybeAddPartition(tp0);
+client.prepareResponse(buildAddPartitionsToTxnResponseData(0, 
Collections.singletonMap(tp0, Errors.NONE)));
+sender.runOnce();
+
+// Send first ProduceRequest
+Future request1 = appendToAccumulator(tp0);
+sender.runOnce();  // send request
+
+Node node = metadata.fetch().nodes().get(0);
+time.sleep(2000L);
+client.disconnect(node.idString(), true);
+client.backoff(node, 10);
+
+sender.runOnce(); // now expire the batch.
+assertFutureFailure(request1, TimeoutException.class);
+
+time.sleep(20);
+
+sendIdempotentProducerResponse(0, tp0, Errors.INVALID_RECORD, -1);
+sender.runOnce(); // receive late response
+
+// Loop once and confirm that the transaction manager does not enter a 
fatal error state
+sender.runOnce();
+assertTrue(txnManager.hasAbortableError());
+TransactionalRequestResult result = txnManager.beginAbort();
+sender.runOnce();
+
+respondToEndTxn(Errors.NONE);
+sender.runOnce();
+assertTrue(txnManager::isInitializing);

Review Comment:
   I'm following conventions from the rest of the file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1261443466


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -3057,6 +3057,56 @@ public void 
testSenderShouldRetryWithBackoffOnRetriableError() {
 assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2);
 }
 
+@Test
+public void testReceiveFailedBatchTwiceWithTransactions() throws Exception 
{
+ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
+apiVersions.update("0", 
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
+TransactionManager txnManager = new TransactionManager(logContext, 
"testFailTwice", 6, 100, apiVersions);
+
+setupWithTransactionState(txnManager);
+doInitTransactions(txnManager, producerIdAndEpoch);
+
+txnManager.beginTransaction();
+txnManager.maybeAddPartition(tp0);
+client.prepareResponse(buildAddPartitionsToTxnResponseData(0, 
Collections.singletonMap(tp0, Errors.NONE)));
+sender.runOnce();
+
+// Send first ProduceRequest
+Future request1 = appendToAccumulator(tp0);
+sender.runOnce();  // send request
+
+Node node = metadata.fetch().nodes().get(0);
+time.sleep(2000L);
+client.disconnect(node.idString(), true);
+client.backoff(node, 10);
+
+sender.runOnce(); // now expire the batch.

Review Comment:
   I think it is both? This wording is the same as the other tests in the file 
that expire batches. (There are 4 others in the file). If i remove one or the 
other the test doesn't work correctly.
   
   Without the disconnect, we are not really able to receive two responses 
since I modified the disconnect code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-12 Thread via GitHub


C0urante commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1261413255


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   Ah, nice catch! I noticed the discrepancy in numeric types while working on 
[KAFKA-15177](https://issues.apache.org/jira/browse/KAFKA-15177) but hadn't 
even considered the possibility of aligning the types across invocations of 
`alterOffsets` and `OffsetStorageReader::offset`/`OffsetStorageReader::offsets`.
   
   I think re-deserializing the offsets before passing them to `alterOffsets` 
is a great idea. Unless the request body is gigantic there shouldn't be serious 
performance concerns, and it also acts as a nice preflight check to ensure that 
the offsets can be successfully propagated to the connector's tasks through the 
offsets topic.
   
   I still don't love permitting string types for the connector's `position` 
offset values--it doesn't seem like a great endorsement of our API if we have 
to implement workarounds in the file connectors, which are the first example of 
the connector API that many developers see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nkostoulas opened a new pull request, #14002: Move throttling rate to KafkaConfig

2023-07-12 Thread via GitHub


nkostoulas opened a new pull request, #14002:
URL: https://github.com/apache/kafka/pull/14002

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14938) Flaky test org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary

2023-07-12 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14938:
--
Fix Version/s: 3.5.2

> Flaky test 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary
> --
>
> Key: KAFKA-14938
> URL: https://issues.apache.org/jira/browse/KAFKA-14938
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sambhav Jain
>Priority: Major
> Fix For: 3.6.0, 3.4.2, 3.5.2
>
>
> Test seems to be failing with 
> ```
> ava.lang.AssertionError: Not enough records produced by source connector. 
> Expected at least: 100 + but got 72
> h4. Stacktrace
> java.lang.AssertionError: Not enough records produced by source connector. 
> Expected at least: 100 + but got 72
>  at org.junit.Assert.fail(Assert.java:89)
>  at org.junit.Assert.assertTrue(Assert.java:42)
>  at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:421)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>  at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
>  at 
> org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
>  at 
> 

[jira] [Updated] (KAFKA-14938) Flaky test org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary

2023-07-12 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14938:
--
Fix Version/s: 3.4.2

> Flaky test 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary
> --
>
> Key: KAFKA-14938
> URL: https://issues.apache.org/jira/browse/KAFKA-14938
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sambhav Jain
>Priority: Major
> Fix For: 3.6.0, 3.4.2
>
>
> Test seems to be failing with 
> ```
> ava.lang.AssertionError: Not enough records produced by source connector. 
> Expected at least: 100 + but got 72
> h4. Stacktrace
> java.lang.AssertionError: Not enough records produced by source connector. 
> Expected at least: 100 + but got 72
>  at org.junit.Assert.fail(Assert.java:89)
>  at org.junit.Assert.assertTrue(Assert.java:42)
>  at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:421)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>  at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
>  at 
> org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
>  at 
> 

[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1261385072


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -3057,6 +3057,56 @@ public void 
testSenderShouldRetryWithBackoffOnRetriableError() {
 assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2);
 }
 
+@Test
+public void testReceiveFailedBatchTwiceWithTransactions() throws Exception 
{
+ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
+apiVersions.update("0", 
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
+TransactionManager txnManager = new TransactionManager(logContext, 
"testFailTwice", 6, 100, apiVersions);
+
+setupWithTransactionState(txnManager);
+doInitTransactions(txnManager, producerIdAndEpoch);
+
+txnManager.beginTransaction();
+txnManager.maybeAddPartition(tp0);
+client.prepareResponse(buildAddPartitionsToTxnResponseData(0, 
Collections.singletonMap(tp0, Errors.NONE)));
+sender.runOnce();
+
+// Send first ProduceRequest
+Future request1 = appendToAccumulator(tp0);
+sender.runOnce();  // send request
+
+Node node = metadata.fetch().nodes().get(0);
+time.sleep(2000L);

Review Comment:
   If we don't sleep, then the future does not complete. There is another test 
in the file that explains this only expires the batch if it hits the delivery 
timeout. Note in the example, time.sleep(1000) was already called.
   
   ```
   // We add 600 millis to expire the first batch but not the second.
   // Note deliveryTimeoutMs is 1500.
time.sleep(600L);
   client.disconnect(node.idString());
   client.backoff(node, 10);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-07-12 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1261385072


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -3057,6 +3057,56 @@ public void 
testSenderShouldRetryWithBackoffOnRetriableError() {
 assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2);
 }
 
+@Test
+public void testReceiveFailedBatchTwiceWithTransactions() throws Exception 
{
+ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
+apiVersions.update("0", 
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
+TransactionManager txnManager = new TransactionManager(logContext, 
"testFailTwice", 6, 100, apiVersions);
+
+setupWithTransactionState(txnManager);
+doInitTransactions(txnManager, producerIdAndEpoch);
+
+txnManager.beginTransaction();
+txnManager.maybeAddPartition(tp0);
+client.prepareResponse(buildAddPartitionsToTxnResponseData(0, 
Collections.singletonMap(tp0, Errors.NONE)));
+sender.runOnce();
+
+// Send first ProduceRequest
+Future request1 = appendToAccumulator(tp0);
+sender.runOnce();  // send request
+
+Node node = metadata.fetch().nodes().get(0);
+time.sleep(2000L);

Review Comment:
   If we don't sleep, then the future does not complete. There is another test 
in the file that explains this only expires the batch if it hits the delivery 
timeout
   
   ```
   // We add 600 millis to expire the first batch but not the second.
   // Note deliveryTimeoutMs is 1500.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #13975: KAFKA-15161: Fix InvalidReplicationFactorException at connect startup

2023-07-12 Thread via GitHub


viktorsomogyi commented on code in PR #13975:
URL: https://github.com/apache/kafka/pull/13975#discussion_r1261378645


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -174,14 +174,25 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 errorUnavailableEndpoints: Boolean = false,
 errorUnavailableListeners: Boolean = false): 
Seq[MetadataResponseTopic] = {
 val image = _currentImage
-topics.toSeq.flatMap { topic =>
-  getPartitionMetadata(image, topic, listenerName, 
errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
+if (!isInitialized()) {
+  topics.toSeq.map(topic =>

Review Comment:
   In case of Zookeeper-Kafka I think this would be a good solution. With KRaft 
it seems though that it already returns `UNKNOWN_TOPIC_OR_PARTITION` error so 
the fix may not be needed there. I'll do some more digging tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #13975: KAFKA-15161: Fix InvalidReplicationFactorException at connect startup

2023-07-12 Thread via GitHub


viktorsomogyi commented on code in PR #13975:
URL: https://github.com/apache/kafka/pull/13975#discussion_r1261378645


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -174,14 +174,25 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 errorUnavailableEndpoints: Boolean = false,
 errorUnavailableListeners: Boolean = false): 
Seq[MetadataResponseTopic] = {
 val image = _currentImage
-topics.toSeq.flatMap { topic =>
-  getPartitionMetadata(image, topic, listenerName, 
errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
+if (!isInitialized()) {
+  topics.toSeq.map(topic =>

Review Comment:
   In case of Zookeeper-Kafka I think this would be a good solution. With KRaft 
it seems though that it already returns `UNKNOWN_TOPIC_OR_PARTITION` error so 
the fix may not needed there. I'll do some more digging tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

2023-07-12 Thread via GitHub


C0urante merged PR #13646:
URL: https://github.com/apache/kafka/pull/13646


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14938) Flaky test org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary

2023-07-12 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14938:
--
Fix Version/s: 3.6.0

> Flaky test 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary
> --
>
> Key: KAFKA-14938
> URL: https://issues.apache.org/jira/browse/KAFKA-14938
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sambhav Jain
>Priority: Major
> Fix For: 3.6.0
>
>
> Test seems to be failing with 
> ```
> ava.lang.AssertionError: Not enough records produced by source connector. 
> Expected at least: 100 + but got 72
> h4. Stacktrace
> java.lang.AssertionError: Not enough records produced by source connector. 
> Expected at least: 100 + but got 72
>  at org.junit.Assert.fail(Assert.java:89)
>  at org.junit.Assert.assertTrue(Assert.java:42)
>  at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:421)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>  at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
>  at 
> org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
>  at 
> 

[GitHub] [kafka] nicktelford commented on a diff in pull request #13993: KAFKA-15178: Improve ConsumerCoordinator.poll perf

2023-07-12 Thread via GitHub


nicktelford commented on code in PR #13993:
URL: https://github.com/apache/kafka/pull/13993#discussion_r1261346057


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -119,6 +119,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 private Set joinedSubscription;
 private MetadataSnapshot metadataSnapshot;
 private MetadataSnapshot assignmentSnapshot;
+private boolean metadataUpdated;

Review Comment:
   Note: in 633f0e7 I switched to using a `LinkedHashSet`, because it performs 
better for iteration (which is used during `equals`) than `HashSet`, at the 
cost of a little more memory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru opened a new pull request, #14001: Kafka Streams Threading: Exception handling

2023-07-12 Thread via GitHub


lucasbru opened a new pull request, #14001:
URL: https://github.com/apache/kafka/pull/14001

   Implements punctuation inside processing threads. The scheduler
   algorithm checks if a task that is not assigned currently can
   be punctuated, and returns it when a worker thread asks for the
   next task to be processed. Then, the processing thread runs all
   punctuations in the punctionation queue.
   
   Piggy-backed: take TaskExecutionMetadata into account when
   processing records.
   
   This is a stacked PR, only the last commit needs to be reviewed
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest

2023-07-12 Thread via GitHub


divijvaidya commented on code in PR #13956:
URL: https://github.com/apache/kafka/pull/13956#discussion_r1261325930


##
core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala:
##
@@ -346,6 +346,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest 
{
partitionsToAssign: 
Set[TopicPartition],
userRebalanceListener: 
ConsumerRebalanceListener)
 extends ShutdownableThread("daemon-consumer-assignment", false) {
+setDaemon(true)

Review Comment:
   I added this as a fail safe but yes, it's not required if we close them 
properly. I have reverted them back in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils

2023-07-12 Thread via GitHub


fvaleri commented on PR #13158:
URL: https://github.com/apache/kafka/pull/13158#issuecomment-1632708004

   Yes, once the MirrorMaker1 dependency will be gone in 4.0.0, we can move 
them to tools. I added a note in 
[KAFKA-14705](https://issues.apache.org/jira/browse/KAFKA-14705)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14705) Tools cleanup for the next major release

2023-07-12 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-14705:

Description: 
We can use this task to track tools cleanup for the next major release (4.0.0).

1. Redirections to be removed:
 - core/src/main/scala/kafka/tools/JmxTool
 - core/src/main/scala/kafka/tools/ClusterTool
 - core/src/main/scala/kafka/tools/StateChangeLogMerger
 - core/src/main/scala/kafka/tools/EndToEndLatency
 - core/src/main/scala/kafka/admin/FeatureCommand
 - core/src/main/scala/kafka/tools/StreamsResetter

2. Deprecated tools to be removed:
 - tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger

3. TopicFilter, PartitionFilter and TopicPartitionFilter in "server-common" 
should be moved to "tools" once we get rid of MirrorMaker1 dependency.

4. We should also get rid of many deprecated options across all tools, 
including not migrated tools.

  was:
We can use this task to track tools cleanup for the next major release (4.0.0).
As part of this activity, we should also get rid of many deprecated options 
across all tools, including not migrated tools.

Redirections to be removed:
 - core/src/main/scala/kafka/tools/JmxTool
 - core/src/main/scala/kafka/tools/ClusterTool
 - core/src/main/scala/kafka/tools/StateChangeLogMerger
 - core/src/main/scala/kafka/tools/EndToEndLatency
 - core/src/main/scala/kafka/admin/FeatureCommand
 - core/src/main/scala/kafka/tools/StreamsResetter

Deprecated tools to be removed:
 - tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger

TopicFilter, PartitionFilter and TopicPartitionFilter in "server-common" should 
be moved to "tools" once we get rid of MirrorMaker1 dependency.


> Tools cleanup for the next major release
> 
>
> Key: KAFKA-14705
> URL: https://issues.apache.org/jira/browse/KAFKA-14705
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Federico Valeri
>Priority: Major
> Fix For: 4.0.0
>
>
> We can use this task to track tools cleanup for the next major release 
> (4.0.0).
> 1. Redirections to be removed:
>  - core/src/main/scala/kafka/tools/JmxTool
>  - core/src/main/scala/kafka/tools/ClusterTool
>  - core/src/main/scala/kafka/tools/StateChangeLogMerger
>  - core/src/main/scala/kafka/tools/EndToEndLatency
>  - core/src/main/scala/kafka/admin/FeatureCommand
>  - core/src/main/scala/kafka/tools/StreamsResetter
> 2. Deprecated tools to be removed:
>  - tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger
> 3. TopicFilter, PartitionFilter and TopicPartitionFilter in "server-common" 
> should be moved to "tools" once we get rid of MirrorMaker1 dependency.
> 4. We should also get rid of many deprecated options across all tools, 
> including not migrated tools.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] nicktelford commented on a diff in pull request #13993: KAFKA-15178: Improve ConsumerCoordinator.poll perf

2023-07-12 Thread via GitHub


nicktelford commented on code in PR #13993:
URL: https://github.com/apache/kafka/pull/13993#discussion_r1261318387


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -119,6 +119,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 private Set joinedSubscription;
 private MetadataSnapshot metadataSnapshot;
 private MetadataSnapshot assignmentSnapshot;
+private boolean metadataUpdated;

Review Comment:
   I've revised the implementation to instead attempt to optimize the 
comparison itself in 63ee9ab, but I'm not entirely sure that it will perform 
better.
   
   I'd like to construct a micro-benchmark for it, but I honestly have no idea 
how I'd run a micro-benchmark on this part of the codebase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14705) Tools cleanup for the next major release

2023-07-12 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-14705:

Description: 
We can use this task to track tools cleanup for the next major release (4.0.0).
As part of this activity, we should also get rid of many deprecated options 
across all tools, including not migrated tools.

Redirections to be removed:
 - core/src/main/scala/kafka/tools/JmxTool
 - core/src/main/scala/kafka/tools/ClusterTool
 - core/src/main/scala/kafka/tools/StateChangeLogMerger
 - core/src/main/scala/kafka/tools/EndToEndLatency
 - core/src/main/scala/kafka/admin/FeatureCommand
 - core/src/main/scala/kafka/tools/StreamsResetter

Deprecated tools to be removed:
 - tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger

TopicFilter, PartitionFilter and TopicPartitionFilter in "server-common" should 
be moved to "tools" once we get rid of MirrorMaker1 dependency.

  was:
We can use this task to track tools cleanup for the next major release (4.0.0).
As part of this activity, we should also get rid of many deprecated options 
across all tools, including not migrated tools.

Redirections to be removed:
 - core/src/main/scala/kafka/tools/JmxTool
 - core/src/main/scala/kafka/tools/ClusterTool
 - core/src/main/scala/kafka/tools/StateChangeLogMerger
 - core/src/main/scala/kafka/tools/EndToEndLatency
 - core/src/main/scala/kafka/admin/FeatureCommand
 - core/src/main/scala/kafka/tools/StreamsResetter

Deprecated tools to be removed:
 - tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger



> Tools cleanup for the next major release
> 
>
> Key: KAFKA-14705
> URL: https://issues.apache.org/jira/browse/KAFKA-14705
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Federico Valeri
>Priority: Major
> Fix For: 4.0.0
>
>
> We can use this task to track tools cleanup for the next major release 
> (4.0.0).
> As part of this activity, we should also get rid of many deprecated options 
> across all tools, including not migrated tools.
> Redirections to be removed:
>  - core/src/main/scala/kafka/tools/JmxTool
>  - core/src/main/scala/kafka/tools/ClusterTool
>  - core/src/main/scala/kafka/tools/StateChangeLogMerger
>  - core/src/main/scala/kafka/tools/EndToEndLatency
>  - core/src/main/scala/kafka/admin/FeatureCommand
>  - core/src/main/scala/kafka/tools/StreamsResetter
> Deprecated tools to be removed:
>  - tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger
> TopicFilter, PartitionFilter and TopicPartitionFilter in "server-common" 
> should be moved to "tools" once we get rid of MirrorMaker1 dependency.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils

2023-07-12 Thread via GitHub


mimaison commented on PR #13158:
URL: https://github.com/apache/kafka/pull/13158#issuecomment-1632638912

   I took a look at the changes and they look fine. If I understand correctly 
we're moving these classes to server-common while we have to keep MirrorMaker. 
Then in Kafka 4.0 (and assuming 
[KAFKA-14525](https://issues.apache.org/jira/browse/KAFKA-14525) is complete) 
we will be able to move all these classes to tools. Is that right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #13988: [KAFKA-15137] Do not log entire request payload in KRaftControllerChannelManager

2023-07-12 Thread via GitHub


mumrah commented on PR #13988:
URL: https://github.com/apache/kafka/pull/13988#issuecomment-1632618714

   Thanks @divijvaidya! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest

2023-07-12 Thread via GitHub


divijvaidya commented on PR #13956:
URL: https://github.com/apache/kafka/pull/13956#issuecomment-1632593879

   >Did you verify the thread leak by checking the threat dump? 
   
   No, I found the leak from a very useful line in the tests I added recently. 
It prints the names of the leaked thread, `Found 3 unexpected threads during 
@AfterAll: `controller-event-thread,daemon-bounce-broker-EventThread,Test 
worker-EventThread` ==> expected:  but was: `
   
   @philipnee I wanted your expert opinion on the possible reason for failure 
of `ConsumerBounceTest > testConsumptionWithBrokerFailures`. See the stack 
trace in the description. Is there anything that pops in your head that could 
make it less flaky (given where it fails in the stacktrace). Asking because, 
this failure was the cause of mayhem which leaked threads.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15181) Race condition on partition assigned to TopicBasedRemoteLogMetadataManager

2023-07-12 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-15181:


 Summary: Race condition on partition assigned to 
TopicBasedRemoteLogMetadataManager 
 Key: KAFKA-15181
 URL: https://issues.apache.org/jira/browse/KAFKA-15181
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Jorge Esteban Quilcate Otoya
Assignee: Jorge Esteban Quilcate Otoya


TopicBasedRemoteLogMetadataManager (TBRLMM) uses a cache to be prepared whever 
partitions are assigned.

When partitions are assigned to the TBRLMM instance, a consumer is started to 
keep the cache up to date.

If the cache hasn't finalized to build, TBRLMM fails to return remote metadata 
about partitions that are store on the backing topic. TBRLMM may not recover 
from this failing state.

A proposal to fix this issue would be wait after a partition is assigned for 
the consumer to catch up. A similar logic is used at the moment when TBRLMM 
writes to the topic, and uses send callback to wait for consumer to catch up. 
This logic can be reused whever a partition is assigned, so when TBRLMM is 
marked as initialized, cache is ready to serve requests.


Reference: https://github.com/aiven/kafka/issues/33



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13585: KAFKA-14737: Move kafka.utils.json to server-common

2023-07-12 Thread via GitHub


OmniaGM commented on code in PR #13585:
URL: https://github.com/apache/kafka/pull/13585#discussion_r1261125892


##
server-common/src/main/java/org/apache/kafka/server/util/json/DecodeJson.java:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.util.json;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public interface DecodeJson {
+/**
+ * Decode the JSON node provided into an instance of `T`.
+ *
+ * @throws JsonMappingException if `node` cannot be decoded into `T`.
+ */
+T decode(JsonNode node) throws JsonMappingException;
+
+static JsonMappingException throwJsonMappingException(String expectedType, 
JsonNode node) {
+return new JsonMappingException(null, String.format("Expected `%s` 
value, received %s", expectedType, node));
+}
+
+final class DecodeBoolean implements DecodeJson {
+@Override
+public Boolean decode(JsonNode node) throws JsonMappingException {
+if (node.isBoolean()) {
+return node.booleanValue();
+}
+throw throwJsonMappingException(Boolean.class.getSimpleName(), 
node);
+}
+}
+
+final class DecodeDouble implements DecodeJson {
+@Override
+public Double decode(JsonNode node) throws JsonMappingException {
+if (node.isDouble() || node.isLong() || node.isInt()) {
+return node.doubleValue();
+}
+throw throwJsonMappingException(Double.class.getSimpleName(), 
node);
+}
+}
+
+final class DecodeInteger implements DecodeJson {
+@Override
+public Integer decode(JsonNode node) throws JsonMappingException {
+if (node.isInt()) {
+return node.intValue();
+}
+throw throwJsonMappingException(Integer.class.getSimpleName(), 
node);
+}
+}
+
+final class DecodeLong implements DecodeJson {
+@Override
+public Long decode(JsonNode node) throws JsonMappingException {
+if (node.isLong() || node.isInt()) {
+return node.longValue();
+}
+throw throwJsonMappingException(Long.class.getSimpleName(), node);
+}
+}
+
+final class DecodeString implements DecodeJson {
+@Override
+public String decode(JsonNode node) throws JsonMappingException {
+if (node.isTextual()) {
+return node.textValue();
+}
+throw throwJsonMappingException(String.class.getSimpleName(), 
node);
+}
+}
+
+static  DecodeJson> decodeOptional(DecodeJson 
decodeJson) {
+return node -> {
+if (node.isNull()) return Optional.empty();
+return Optional.of(decodeJson.decode(node));
+};
+}
+
+static  DecodeJson> decodeList(DecodeJson decodeJson) {
+return node -> {
+if (node.isArray()) {
+List result = new ArrayList<>();
+Iterator elements = node.elements();
+while (elements.hasNext()) {
+try {
+result.add(decodeJson.decode(elements.next()));
+} catch (JsonMappingException e) {
+throw e;

Review Comment:
   Probably not, I was trying to stick to the original interface (which doesn't 
have any exception in the signature) as much as I could in case we switched 
everything to use this new class instead. I will remove `catch` and make 
`decodeList` throw `JsonMappingException` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13944: KAFKA-14953: Add tiered storage related metrics

2023-07-12 Thread via GitHub


divijvaidya commented on code in PR #13944:
URL: https://github.com/apache/kafka/pull/13944#discussion_r1261124589


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -277,6 +277,12 @@ class BrokerTopicMetrics(name: Option[String]) {
 BrokerTopicStats.TotalFetchRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
 BrokerTopicStats.FetchMessageConversionsPerSec -> 
MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
 BrokerTopicStats.ProduceMessageConversionsPerSec -> 
MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"),
+BrokerTopicStats.RemoteCopyBytesPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteCopyBytesPerSec, "bytes"),
+BrokerTopicStats.RemoteFetchBytesPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteFetchBytesPerSec, "bytes"),
+BrokerTopicStats.RemoteReadRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteReadRequestsPerSec, "requests"),
+BrokerTopicStats.RemoteWriteRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteWriteRequestsPerSec, "requests"),
+BrokerTopicStats.FailedRemoteReadRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.FailedRemoteReadRequestsPerSec, "requests"),
+BrokerTopicStats.FailedRemoteWriteRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.FailedRemoteWriteRequestsPerSec, "requests"),

Review Comment:
   We at (Amazon MSK) have an implementation against the KIP-405 interfaces [1] 
and we explicitly tell the customers that the metric such as 
`RemoteBytesInPerSec` are as per KIP-405 contract. If we change the KIP now, it 
would be a break in contract for our customers. Hence, I would not be in favour 
of amending the accepted KIP at this time.
   
   [1] see: RemoteBytesInPerSec at 
https://docs.aws.amazon.com/msk/latest/developerguide/metrics-details.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15132) Implement disable & re-enablement for Tiered Storage

2023-07-12 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742409#comment-17742409
 ] 

Divij Vaidya commented on KAFKA-15132:
--

Update - need more time. new ETA 10th July.

> Implement disable & re-enablement for Tiered Storage
> 
>
> Key: KAFKA-15132
> URL: https://issues.apache.org/jira/browse/KAFKA-15132
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Major
>  Labels: kip
>
> KIP-405 [1] introduces the Tiered Storage feature in Apache Kafka. One of the 
> limitations mentioned in the KIP is inability to re-enable TS on a topic 
> after it has been disabled.
> {quote}Once tier storage is enabled for a topic, it can not be disabled. We 
> will add this feature in future versions. One possible workaround is to 
> create a new topic and copy the data from the desired offset and delete the 
> old topic. 
> {quote}
> This task will propose a new KIP which extends on KIP-405 to describe the 
> behaviour on on disablement and re-enablement of tiering storage for a topic. 
> The solution will apply for both Zk and KRaft variants.
> [1] KIP-405 - 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14915) Option to consume multiple partitions that have their data in remote storage for the target offsets.

2023-07-12 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14915:
-
Description: Context: 
https://github.com/apache/kafka/pull/13535#discussion_r1171250580

> Option to consume multiple partitions that have their data in remote storage 
> for the target offsets.
> 
>
> Key: KAFKA-14915
> URL: https://issues.apache.org/jira/browse/KAFKA-14915
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> Context: https://github.com/apache/kafka/pull/13535#discussion_r1171250580



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeqo commented on a diff in pull request #13944: KAFKA-14953: Add tiered storage related metrics

2023-07-12 Thread via GitHub


jeqo commented on code in PR #13944:
URL: https://github.com/apache/kafka/pull/13944#discussion_r1261114941


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -277,6 +277,12 @@ class BrokerTopicMetrics(name: Option[String]) {
 BrokerTopicStats.TotalFetchRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
 BrokerTopicStats.FetchMessageConversionsPerSec -> 
MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
 BrokerTopicStats.ProduceMessageConversionsPerSec -> 
MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"),
+BrokerTopicStats.RemoteCopyBytesPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteCopyBytesPerSec, "bytes"),
+BrokerTopicStats.RemoteFetchBytesPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteFetchBytesPerSec, "bytes"),
+BrokerTopicStats.RemoteReadRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteReadRequestsPerSec, "requests"),
+BrokerTopicStats.RemoteWriteRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteWriteRequestsPerSec, "requests"),
+BrokerTopicStats.FailedRemoteReadRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.FailedRemoteReadRequestsPerSec, "requests"),
+BrokerTopicStats.FailedRemoteWriteRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.FailedRemoteWriteRequestsPerSec, "requests"),

Review Comment:
   I was suspecting this would be the case. Though as metrics haven't been 
implemented yet, would it be possible to amend the KIP and ask for feedback?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.

2023-07-12 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742407#comment-17742407
 ] 

Divij Vaidya commented on KAFKA-14912:
--

[~ckamal] if you aren't actively working on this, can we have someone else in 
the community pick this up?

> Introduce a configuration for remote index cache size, preferably a dynamic 
> config.
> ---
>
> Key: KAFKA-14912
> URL: https://issues.apache.org/jira/browse/KAFKA-14912
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on a diff in pull request #13585: KAFKA-14737: Move kafka.utils.json to server-common

2023-07-12 Thread via GitHub


mimaison commented on code in PR #13585:
URL: https://github.com/apache/kafka/pull/13585#discussion_r1261096752


##
server-common/src/main/java/org/apache/kafka/server/util/json/DecodeJson.java:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.util.json;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public interface DecodeJson {
+/**
+ * Decode the JSON node provided into an instance of `T`.
+ *
+ * @throws JsonMappingException if `node` cannot be decoded into `T`.
+ */
+T decode(JsonNode node) throws JsonMappingException;
+
+static JsonMappingException throwJsonMappingException(String expectedType, 
JsonNode node) {
+return new JsonMappingException(null, String.format("Expected `%s` 
value, received %s", expectedType, node));
+}
+
+final class DecodeBoolean implements DecodeJson {
+@Override
+public Boolean decode(JsonNode node) throws JsonMappingException {
+if (node.isBoolean()) {
+return node.booleanValue();
+}
+throw throwJsonMappingException(Boolean.class.getSimpleName(), 
node);
+}
+}
+
+final class DecodeDouble implements DecodeJson {
+@Override
+public Double decode(JsonNode node) throws JsonMappingException {
+if (node.isDouble() || node.isLong() || node.isInt()) {
+return node.doubleValue();
+}
+throw throwJsonMappingException(Double.class.getSimpleName(), 
node);
+}
+}
+
+final class DecodeInteger implements DecodeJson {
+@Override
+public Integer decode(JsonNode node) throws JsonMappingException {
+if (node.isInt()) {
+return node.intValue();
+}
+throw throwJsonMappingException(Integer.class.getSimpleName(), 
node);
+}
+}
+
+final class DecodeLong implements DecodeJson {
+@Override
+public Long decode(JsonNode node) throws JsonMappingException {
+if (node.isLong() || node.isInt()) {
+return node.longValue();
+}
+throw throwJsonMappingException(Long.class.getSimpleName(), node);
+}
+}
+
+final class DecodeString implements DecodeJson {
+@Override
+public String decode(JsonNode node) throws JsonMappingException {
+if (node.isTextual()) {
+return node.textValue();
+}
+throw throwJsonMappingException(String.class.getSimpleName(), 
node);
+}
+}
+
+static  DecodeJson> decodeOptional(DecodeJson 
decodeJson) {
+return node -> {
+if (node.isNull()) return Optional.empty();
+return Optional.of(decodeJson.decode(node));
+};
+}
+
+static  DecodeJson> decodeList(DecodeJson decodeJson) {
+return node -> {
+if (node.isArray()) {
+List result = new ArrayList<>();
+Iterator elements = node.elements();
+while (elements.hasNext()) {
+try {
+result.add(decodeJson.decode(elements.next()));
+} catch (JsonMappingException e) {
+throw e;

Review Comment:
   Do we need this `catch` is we rethrow the same exception directly?
   Same in `decodeMap()` below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >