[GitHub] [kafka] cmccabe opened a new pull request, #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics
cmccabe opened a new pull request, #14010: URL: https://github.com/apache/kafka/pull/14010 Implement some of the metrics from KIP-938: Add more metrics for measuring KRaft performance. Add these metrics to QuorumControllerMetrics: kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount kafka.controller:type=KafkaController,name=NewActiveControllersCount Create LoaderMetrics with these new metrics: kafka.server:type=MetadataLoader,name=CurrentMetadataVersion kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount Create SnapshotEmitterMetrics with these new metrics: kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on PR #11433: URL: https://github.com/apache/kafka/pull/11433#issuecomment-1633559424 @mjsax thanks for notifying and that's ok :D Even I am not too up to date with the latest developments on Streams so in a way I dont think I was able to do full justification to this. Thanks for all the review and help on this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #14003: KAFKA-15182: Normalize source connector offsets before invoking SourceConnector::alterOffsets
yashmayya commented on code in PR #14003: URL: https://github.com/apache/kafka/pull/14003#discussion_r1261985349 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1608,6 +1612,32 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map + * Visible for testing. + * + * @param originalOffsets the offsets that are to be normalized + * @return the normalized offsets + */ +@SuppressWarnings("unchecked") +Map, Map> normalizeSourceConnectorOffsets(Map, Map> originalOffsets) { +Map, Map> normalizedOffsets = new HashMap<>(); +for (Map.Entry, Map> entry : originalOffsets.entrySet()) { +OffsetUtils.validateFormat(entry.getKey()); +OffsetUtils.validateFormat(entry.getValue()); +byte[] serializedKey = internalKeyConverter.fromConnectData("", null, entry.getKey()); Review Comment: This should be safe to do because the `OffsetStorageReaderImpl` also serializes the connector / task specified source partition before retrieving its corresponding source offset. The difference here is that there is an extra ser / deser hop although that shouldn't cause issues. So, for instance: ``` Map p1 = Collections.singletonMap("partition_key", 10); Map p2 = Collections.singletonMap("partition_key", 10L); ByteBuffer serializedP1 = ByteBuffer.wrap(converter.fromConnectData("", null, p1)); ByteBuffer serializedP2 = ByteBuffer.wrap(converter.fromConnectData("", null, p2)); assertTrue(serializedP1.equals(serializedP2)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1261948519 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; +} + +if (!offset.containsKey(POSITION_FIELD)) { +throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); +} + +// The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value +try { +long offsetPosition = Long.parseLong(String.valueOf(offset.get(POSITION_FIELD))); Review Comment: > Hmmm... wouldn't that be a pretty serious breaking change if we accidentally switched up how the JSON converter deserializes integer types? Not just for the file source connector, but for plenty of others. Okay, that's fair enough, I've changed the check in `FileStreamSourceConnector::alterOffsets` to mirror the one made in the task at startup for consistency (and avoided making changes in the existing task logic). This does mean that this PR should be merged after https://github.com/apache/kafka/pull/14003 has been merged (assuming that that approach is acceptable). > I don't know if this significantly changes the conversation but it seems subtle and counterintuitive enough to bring up so that we can avoid accidentally breaking connector code that relies on this behavior. Hm yeah, that's definitely another interesting one to bring up - however, I'd contend that that one kinda makes sense since we're passing the `SourceRecord` itself - tasks already deal with `SourceRecord` and their offsets (and associated types) in their regular lifecycle. It would be highly confusing if the `SourceRecord` that they get in `commitRecord` doesn't match the one they dispatched to the framework via `poll`. Of course, ideally, the offsets that they read via `OffsetStorageReader` should also not have any type mismatches compared to the `SourceRecord` ones, but I don't think we'd want to (or safely could) change that at this point. Since the offsets being altered externally would correspond to the ones that the connector / tasks read at startup, I think it makes sense to align the types across invocations to `SourceConnector::alterOffsets` and offsets queried from an `OffsetStorageReader` (and an implicit separate alignment between the `SourceRecord`'s offsets types). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1261948519 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; +} + +if (!offset.containsKey(POSITION_FIELD)) { +throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); +} + +// The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value +try { +long offsetPosition = Long.parseLong(String.valueOf(offset.get(POSITION_FIELD))); Review Comment: > Hmmm... wouldn't that be a pretty serious breaking change if we accidentally switched up how the JSON converter deserializes integer types? Not just for the file source connector, but for plenty of others. Okay, that's fair enough, I've changed the check in `FileStreamSourceConnector::alterOffsets` to mirror the one made in the task at startup for consistency (and avoided making changes in the existing task logic). > I don't know if this significantly changes the conversation but it seems subtle and counterintuitive enough to bring up so that we can avoid accidentally breaking connector code that relies on this behavior. Hm yeah, that's definitely another interesting one to bring up - however, I'd contend that that one kinda makes sense since we're passing the `SourceRecord` itself - tasks already deal with `SourceRecord` and their offsets (and associated types) in their regular lifecycle. It would be highly confusing if the `SourceRecord` that they get in `commitRecord` doesn't match the one they dispatched to the framework via `poll`. Of course, ideally, the offsets that they read via `OffsetStorageReader` should also not have any type mismatches compared to the `SourceRecord` ones, but I don't think we'd want to (or safely could) change that at this point. Since the offsets being altered externally would correspond to the ones that the connector / tasks read at startup, I think it makes sense to align the types across invocations to `SourceConnector::alterOffsets` and offsets queried from an `OffsetStorageReader` (and an implicit separate alignment between the `SourceRecord`'s offsets types). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.
[ https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742624#comment-17742624 ] hudeqi commented on KAFKA-14912: Hi [~divijvaidya] [~manyanda] I am also interested, if you are busy later, I can provide alternative support.:D > Introduce a configuration for remote index cache size, preferably a dynamic > config. > --- > > Key: KAFKA-14912 > URL: https://issues.apache.org/jira/browse/KAFKA-14912 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Assignee: Manyanda Chitimbo >Priority: Major > > Context: We need to make the 1024 value here [1] as dynamically configurable > [1] > https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax commented on a diff in pull request #13996: KAFKA-15022: [2/N] introduce graph to compute min cost
mjsax commented on code in PR #13996: URL: https://github.com/apache/kafka/pull/13996#discussion_r1261835084 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java: ## @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.Collectors; + +public class Graph> { +public class Edge implements Comparable { +final V destination; +final int capacity; +final int cost; +int residualFlow; +int flow; +Edge counterEdge; + +public Edge(final V destination, final int capacity, final int cost, final int residualFlow, final int flow) { +Objects.requireNonNull(destination); +this.destination = destination; +this.capacity = capacity; +this.cost = cost; +this.residualFlow = residualFlow; +this.flow = flow; +} + +@Override +public int compareTo(final Edge o) { Review Comment: `compareTo` is to establish an order, right? Why do we order by `(destination,capacity,cost)`; does is matter, or could we use any order as long as deterministic? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java: ## @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.Collectors; + +public class Graph> { +public class Edge implements Comparable { +final V destination; +final int capacity; +final int cost; +int residualFlow; +int flow; +Edge counterEdge; + +public Edge(final V destination, final int capacity, final int cost, final int residualFlow, final int flow) { +Objects.requireNonNull(destination); +this.destination = destination; +this.capacity = capacity; +this.cost = cost; +this.residualFlow = residualFlow; +this.flow = flow; +} + +@Override +public int compareTo(final Edge o) { +int compare = destination.compareTo(o.destination); +if (compare != 0) { +return compare; +} + +compare = capacity - o.capacity; +if (compare != 0) { +return compare; +} + +return cost - o.cost; +} + +@Override +public boolean equals(final Object other) { +if (this == other) { +return true; +} +if (other == null || other.getClass() != getClass()) { +return false; +} + +final Graph.Edge otherEdge = (Graph.Edge) other; + +return destination.equals(otherEdge.destination) && capacity == otherEdge.capacity +
[GitHub] [kafka] hni61223 opened a new pull request, #14009: MINOR: Add dual write offset metric
hni61223 opened a new pull request, #14009: URL: https://github.com/apache/kafka/pull/14009 - Add dual write offset metric for ZK migration - Tested in jconsole *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15169) Add tests for RemoteIndexCache
[ https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742602#comment-17742602 ] Lan Ding commented on KAFKA-15169: -- Sure, I will add tests for RemoteIndexCache in days. > Add tests for RemoteIndexCache > -- > > Key: KAFKA-15169 > URL: https://issues.apache.org/jira/browse/KAFKA-15169 > Project: Kafka > Issue Type: Test >Reporter: Satish Duggana >Priority: Major > Labels: KIP-405 > Fix For: 3.6.0 > > > Follow-up from > https://github.com/apache/kafka/pull/13275#discussion_r1257490978 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15169) Add tests for RemoteIndexCache
[ https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lan Ding reassigned KAFKA-15169: Assignee: Lan Ding > Add tests for RemoteIndexCache > -- > > Key: KAFKA-15169 > URL: https://issues.apache.org/jira/browse/KAFKA-15169 > Project: Kafka > Issue Type: Test >Reporter: Satish Duggana >Assignee: Lan Ding >Priority: Major > Labels: KIP-405 > Fix For: 3.6.0 > > > Follow-up from > https://github.com/apache/kafka/pull/13275#discussion_r1257490978 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] junrao commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic
junrao commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1261801130 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -500,11 +504,13 @@ class ReplicaManager(val config: KafkaConfig, // Delete log and corresponding folders in case replica manager doesn't hold them anymore. // This could happen when topic is being deleted while broker is down and recovers. stoppedPartitions += topicPartition -> deletePartition + if (remoteLogManager.isDefined) +partitionsMaybeToDeleteRemote += topicPartition Review Comment: Hmm, it seems this case can occur during partition reassignment. In that case, we don't want to delete the remote data, right? ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } +public void cleanupDeletedRemoteLogSegments() { Review Comment: This process runs every replica. So, we will be deleting the same remote segment multiple times? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -526,14 +532,17 @@ class ReplicaManager(val config: KafkaConfig, /** * Stop the given partitions. * - * @param partitionsToStopA map from a topic partition to a boolean indicating - *whether the partition should be deleted. + * @param partitionsToStopA map from a topic partition to a boolean indicating + *whether the partition should be deleted. + * @param partitionsMaybeToDeleteRemote A set of topic partitions that may need to delete + *remote segments. * - * @returnA map from partitions to exceptions which occurred. - *If no errors occurred, the map will be empty. + * @returnA map from partitions to exceptions which occurred. + *If no errors occurred, the map will be empty. */ protected def stopPartitions( -partitionsToStop: Map[TopicPartition, Boolean] +partitionsToStop: Map[TopicPartition, Boolean], Review Comment: It seems that the implementation doesn't support KRaft controller. Do we plan to support that for the 3.6.0 release? ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1159,6 +1161,9 @@ class LogManager(logDirs: Seq[File], checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) } addLogToBeDeleted(removedLog) +if (deleteRemote && removedLog.remoteLogEnabled()) + RemoteLogManager.addTopicIdToBeDeleted(removedLog.topicIdAsJava) Review Comment: LogManager only manages local data. So, it's a bit weird to have it call RemoteLogManager. ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } +public void cleanupDeletedRemoteLogSegments() { +if (isCancelled()) +return; + +Uuid topicId = topicIdPartition.topicId(); +if (deletedTopicIds.contains(topicId)) { +cleanupAllRemoteLogSegments(); +cancelRLMtask(); +deletedTopicIds.remove(topicId); +} +} + +private void cleanupAllRemoteLogSegments() { Review Comment: Since this runs asynchronously after topic deletion completes, if every replica is restarted before all remote segments are deleted, we will never be able to remove the remaining remote segments for the deleted topics? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax closed pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
mjsax closed pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora… URL: https://github.com/apache/kafka/pull/11433 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts
[ https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-13295. - Resolution: Fixed With the new restore-thread, this issue should be resolved implicilty. > Long restoration times for new tasks can lead to transaction timeouts > - > > Key: KAFKA-13295 > URL: https://issues.apache.org/jira/browse/KAFKA-13295 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Critical > Labels: eos, new-streams-runtime-should-fix > > In some EOS applications with relatively long restoration times we've noticed > a series of ProducerFencedExceptions occurring during/immediately after > restoration. The broker logs were able to confirm these were due to > transactions timing out. > In Streams, it turns out we automatically begin a new txn when calling > {{send}} (if there isn’t already one in flight). A {{send}} occurs often > outside a commit during active processing (eg writing to the changelog), > leaving the txn open until the next commit. And if a StreamThread has been > actively processing when a rebalance results in a new stateful task without > revoking any existing tasks, the thread won’t actually commit this open txn > before it goes back into the restoration phase while it builds up state for > the new task. So the in-flight transaction is left open during restoration, > during which the StreamThread only consumes from the changelog without > committing, leaving it vulnerable to timing out when restoration times exceed > the configured transaction.timeout.ms for the producer client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15184) New consumer internals refactoring and clean up
[ https://issues.apache.org/jira/browse/KAFKA-15184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15184: -- Description: Minor refactoring of the new consumer internals including introduction of the {{RequestManagers}} class to hold references to the {{RequestManager}} instances. (was: Minor refactoring of the new consumer internals including introduction of the RequestManagers class to hold the various RequestManager instances.) > New consumer internals refactoring and clean up > --- > > Key: KAFKA-15184 > URL: https://issues.apache.org/jira/browse/KAFKA-15184 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: kip-945 > > Minor refactoring of the new consumer internals including introduction of the > {{RequestManagers}} class to hold references to the {{RequestManager}} > instances. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hni61223 closed pull request #14007: MINOR: Add metrics for ZK migration
hni61223 closed pull request #14007: MINOR: Add metrics for ZK migration URL: https://github.com/apache/kafka/pull/14007 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15184) New consumer internals refactoring and clean up
Kirk True created KAFKA-15184: - Summary: New consumer internals refactoring and clean up Key: KAFKA-15184 URL: https://issues.apache.org/jira/browse/KAFKA-15184 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Kirk True Assignee: Kirk True Minor refactoring of the new consumer internals including introduction of the RequestManagers class to hold the various RequestManager instances. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()
philipnee commented on code in PR #13797: URL: https://github.com/apache/kafka/pull/13797#discussion_r1261798307 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -522,7 +525,35 @@ public void subscribe(Collection topics, ConsumerRebalanceListener callb @Override public void assign(Collection partitions) { -throw new KafkaException("method not implemented"); +if (partitions == null) { +throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null"); +} + +if (partitions.isEmpty()) { +this.unsubscribe(); +return; +} + +for (TopicPartition tp : partitions) { +String topic = (tp != null) ? tp.topic() : null; +if (Utils.isBlank(topic)) +throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); +} +// TODO: implement fetcher +// fetcher.clearBufferedDataForUnassignedPartitions(partitions); + +// make sure the offsets of topic partitions the consumer is unsubscribing from +// are committed since there will be no following rebalance +commit(subscriptions.allConsumed()); Review Comment: Hey @junrao - Sorry I misunderstood your concern. To your question: If the coordinator is not available, the commit request manager won't be built and this commit() will be skipped. In the `RequestManagers.java` we've got ``` if (groupRebalanceConfig != null && groupRebalanceConfig.groupId != null) { final GroupState groupState = new GroupState(groupRebalanceConfig); coordinator = new CoordinatorRequestManager(time, logContext, retryBackoffMs, errorEventHandler, groupState.groupId); commit = new CommitRequestManager(time, logContext, subscriptions, config, coordinator, groupState); } ``` to your second question: autoCommit is still guarded by the config per this line in `CommitRequestManager.java` ``` private void maybeAutoCommit() { if (!autoCommitState.isPresent()) { return; } // autocommit otherwise } ``` On the Consumer API level, we treat sync/async commit to be the same, except sync commit waits for the completion of the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #1864: KAFKA-4177: Remove ThrottledReplicationRateLimit from Server Config
junrao commented on PR #1864: URL: https://github.com/apache/kafka/pull/1864#issuecomment-1633289594 @nkostoulas : Since this requires a config change, it will need a KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()
junrao commented on code in PR #13797: URL: https://github.com/apache/kafka/pull/13797#discussion_r1261763698 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -522,7 +525,35 @@ public void subscribe(Collection topics, ConsumerRebalanceListener callb @Override public void assign(Collection partitions) { -throw new KafkaException("method not implemented"); +if (partitions == null) { +throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null"); +} + +if (partitions.isEmpty()) { +this.unsubscribe(); +return; +} + +for (TopicPartition tp : partitions) { +String topic = (tp != null) ? tp.topic() : null; +if (Utils.isBlank(topic)) +throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); +} +// TODO: implement fetcher +// fetcher.clearBufferedDataForUnassignedPartitions(partitions); + +// make sure the offsets of topic partitions the consumer is unsubscribing from +// are committed since there will be no following rebalance +commit(subscriptions.allConsumed()); Review Comment: @philipnee: `KafkaConsumer.assign()` has the following code. ``` // make sure the offsets of topic partitions the consumer is unsubscribing from // are committed since there will be no following rebalance if (coordinator != null) this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds()); ``` 1. In the old code, if the groupId is not specified, coordinator will be null and the offset commit will be skipped. In this PR, we always call offset commit. 2. In the old code, the implementation of `maybeAutoCommitOffsetsAsync()` is guarded by the `ENABLE_AUTO_COMMIT_CONFIG` config. In this PR, there is no such guard. Are both changes expected? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer
jolshan commented on code in PR #13991: URL: https://github.com/apache/kafka/pull/13991#discussion_r1261758965 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.util; + +import org.apache.kafka.server.util.ShutdownableThread; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; + +/** + * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread + * to expire the tasks in the {@link Timer}. + */ +public class SystemTimerReaper implements Timer { +private static final long WORK_TIMEOUT_MS = 200L; Review Comment: Yeah. I got confused by this as well. That was my understanding. That the clock doesn't actually advance unless we expire an event. And we only advance it to that time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hni61223 opened a new pull request, #14007: KMETA-988: Add metrics for ZK migration
hni61223 opened a new pull request, #14007: URL: https://github.com/apache/kafka/pull/14007 - Add dual write offset metric for ZK migration - Tested in jconsole *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools
ruslankrivoshein commented on PR #13562: URL: https://github.com/apache/kafka/pull/13562#issuecomment-1633242202 @fvaleri, I worked a bit after your comments and still have a few questions. First, tell me, please, how you prepare your environment to run `$ bin/kafka-get-offsets.sh`. Is there any guide or quickstart about it? I don't know yet, how to test my editions in that file correctly. Then I need to know, which options are considered to be deprecated. It's for improvement you suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah closed pull request #14006: MINOR Add metrics for ZK migration
mumrah closed pull request #14006: MINOR Add metrics for ZK migration URL: https://github.com/apache/kafka/pull/14006 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer
jeffkbkim commented on code in PR #13991: URL: https://github.com/apache/kafka/pull/13991#discussion_r1261730522 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.util; + +import org.apache.kafka.server.util.ShutdownableThread; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; + +/** + * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread + * to expire the tasks in the {@link Timer}. + */ +public class SystemTimerReaper implements Timer { +private static final long WORK_TIMEOUT_MS = 200L; Review Comment: to confirm, we don't actually advance the timer by 200ms every iteration but wait for a max 200ms if there are no tasks to expire. this is more of a question on system timer, but in `SystemTimer#advanceClock()` ``` timingWheel.advanceClock(bucket.getExpiration()); ``` how is the timer in-sync with the actual time if we advance clock by a custom amount? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.util; + +import org.apache.kafka.server.util.ShutdownableThread; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; + +/** + * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread + * to expire the tasks in the {@link Timer}. + */ +public class SystemTimerReaper implements Timer { +private static final long WORK_TIMEOUT_MS = 200L; + +class Reaper extends ShutdownableThread { +Reaper(String name) { +super(name, false); +} + +@Override +public void doWork() { +try { +timer.advanceClock(WORK_TIMEOUT_MS); +} catch (InterruptedException ex) { +// Ignore. +} +} +} + +private Timer timer; +private Reaper reaper; Review Comment: can these be final? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.util; + +import org.apache.kafka.server.util.ShutdownableThread; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; + +/** + * SystemTimerReaper wraps a {@link Timer} and starts a
[jira] [Comment Edited] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.
[ https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742554#comment-17742554 ] Manyanda Chitimbo edited comment on KAFKA-14912 at 7/12/23 8:35 PM: Hi [~divijvaidya] Yes, I'll be interested in picking this up. Thanks for suggesting it to me. I can start to have a look at it tomorrow. I'll re-assign the issue to myself. was (Author: JIRAUSER299903): Hi [~divijvaidya] Yes, I'll be interested in picking this up. I can start to have a look at it tomorrow. I'll re-assign the issue to myself. > Introduce a configuration for remote index cache size, preferably a dynamic > config. > --- > > Key: KAFKA-14912 > URL: https://issues.apache.org/jira/browse/KAFKA-14912 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Assignee: Kamal Chandraprakash >Priority: Major > > Context: We need to make the 1024 value here [1] as dynamically configurable > [1] > https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.
[ https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manyanda Chitimbo reassigned KAFKA-14912: - Assignee: Manyanda Chitimbo (was: Kamal Chandraprakash) > Introduce a configuration for remote index cache size, preferably a dynamic > config. > --- > > Key: KAFKA-14912 > URL: https://issues.apache.org/jira/browse/KAFKA-14912 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Assignee: Manyanda Chitimbo >Priority: Major > > Context: We need to make the 1024 value here [1] as dynamically configurable > [1] > https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.
[ https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742554#comment-17742554 ] Manyanda Chitimbo commented on KAFKA-14912: --- Hi [~divijvaidya] Yes, I'll be interested in picking this up. I can start to have a look at it tomorrow. I'll re-assign the issue to myself. > Introduce a configuration for remote index cache size, preferably a dynamic > config. > --- > > Key: KAFKA-14912 > URL: https://issues.apache.org/jira/browse/KAFKA-14912 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Assignee: Kamal Chandraprakash >Priority: Major > > Context: We need to make the 1024 value here [1] as dynamically configurable > [1] > https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259040832 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -228,14 +261,21 @@ public List build(TopicsImage topicsImage) { static class GroupMetadataManagerTestContext { static class Builder { -final private Time time = new MockTime(); final private LogContext logContext = new LogContext(); final private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); private MetadataImage metadataImage; -private List assignors; Review Comment: we get illegal state exception if it's not initialized and since it doesn't affect the old protocol i thought it best to initialize it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hni61223 opened a new pull request, #14006: KMETA-988: Add metrics for ZK migration
hni61223 opened a new pull request, #14006: URL: https://github.com/apache/kafka/pull/14006 - Add dual write offset metric for ZK migration - Tested in jconsole *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.
[ https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14912: - Description: Context: We need to make the 1024 value here [1] as dynamically configurable [1] https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119 > Introduce a configuration for remote index cache size, preferably a dynamic > config. > --- > > Key: KAFKA-14912 > URL: https://issues.apache.org/jira/browse/KAFKA-14912 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Assignee: Kamal Chandraprakash >Priority: Major > > Context: We need to make the 1024 value here [1] as dynamically configurable > [1] > https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.
[ https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742550#comment-17742550 ] Divij Vaidya commented on KAFKA-14912: -- [~hudeqi] [~manyanda] [~ivanyu] are you folks interested in picking this one up? > Introduce a configuration for remote index cache size, preferably a dynamic > config. > --- > > Key: KAFKA-14912 > URL: https://issues.apache.org/jira/browse/KAFKA-14912 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Assignee: Kamal Chandraprakash >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 commented on PR #13971: URL: https://github.com/apache/kafka/pull/13971#issuecomment-1633140141 @C0urante Could you review this? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime
gharris1727 commented on PR #13313: URL: https://github.com/apache/kafka/pull/13313#issuecomment-1633138815 Hey @ijuma Do you have the system test results for this branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
C0urante commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1261646923 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; +} + +if (!offset.containsKey(POSITION_FIELD)) { +throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); +} + +// The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value +try { +long offsetPosition = Long.parseLong(String.valueOf(offset.get(POSITION_FIELD))); Review Comment: Hmmm... wouldn't that be a pretty serious breaking change if we accidentally switched up how the JSON converter deserializes integer types? Not just for the file source connector, but for plenty of others. It feels like it might be a better use of our time to make note of this possibility and ensure that we have sufficient unit testing in place to prevent that kind of regression (I suspect we already do but haven't verified this yet). Of course, because things aren't interesting enough already--it turns out that there's actually two different scenarios in which tasks observe offsets for their connector. The first, which we're all familiar with, is when they query them using an [OffsetStorageReader](https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReader.java), which in distributed mode reflects the contents of the offsets topic. The second is when [SourceTask::commitRecord](https://kafka.apache.org/35/javadoc/org/apache/kafka/connect/source/SourceTask.html#commitRecord(org.apache.kafka.connect.source.SourceRecord,org.apache.kafka.clients.producer.RecordMetadata)) is invoked, which carries with it the just-ack'd `SourceRecord` instance originally provided by the task, including the original in-memory source partition and source offset, which may use types that get lost when written to and read back from the offsets topic. I don't know if this significantly changes the conversation but it seems subtle and counterintuitive enough to bring up so that we can avoid accidentally breaking connector code that relies on this behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
C0urante commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1261646923 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; +} + +if (!offset.containsKey(POSITION_FIELD)) { +throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); +} + +// The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value +try { +long offsetPosition = Long.parseLong(String.valueOf(offset.get(POSITION_FIELD))); Review Comment: Hmmm... wouldn't that be a pretty serious breaking change if we accidentally switched up how the JSON converter deserializes integer types? Not just for the file source connector, but for plenty of others. It feels like it might be a better use of our time to make note of this possibility and ensure that we have sufficient unit testing in place to prevent that kind of regression (I suspect we already do but haven't verified this yet). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors
C0urante commented on PR #14005: URL: https://github.com/apache/kafka/pull/14005#issuecomment-1633085269 @yashmayya do you have time to take a look at this? Feel like it may be useful to you with the work we were discussing on https://github.com/apache/kafka/pull/13945 about how we want to set examples with our implementation of this API, and possibly modifying the types of the objects that the framework passes to the `alterOffsets` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 merged pull request #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector
gharris1727 merged PR #13992: URL: https://github.com/apache/kafka/pull/13992 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors
C0urante commented on code in PR #14005: URL: https://github.com/apache/kafka/pull/14005#discussion_r1261630589 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -597,7 +596,9 @@ private Set listPartitions( Admin admin, Collection topics ) throws TimeoutException, InterruptedException, ExecutionException { -assertFalse("collection of topics may not be empty", topics.isEmpty()); Review Comment: Removed assertions from this API as it added an implicit dependency on JUnit 4, which we don't use in the `:connect:mirror` module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector
gharris1727 commented on PR #13992: URL: https://github.com/apache/kafka/pull/13992#issuecomment-1633079621 The flaky CI failures look unrelated and the mirror unit tests pass locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request, #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors
C0urante opened a new pull request, #14005: URL: https://github.com/apache/kafka/pull/14005 [Jira](https://issues.apache.org/jira/browse/KAFKA-15177), [KIP-875](https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect) ### Summary Implements the new `alterOffsets` method in all three MM2 connectors. The only connector that makes use of the Connect framework-managed offsets at runtime is the `MirrorSourceConnector`, but since the other two connectors also emit offsets with their records, we also allow users to add/remove offsets for them as well, as long as the offsets match the format of the ones that are emitted by the connectors. ### Integration testing An integration test case is added (or really, augmented) that focuses primarily on the `MirrorSourceConnector`. This case piggybacks off of the existing `MirrorConnectorsExactlyOnceIntegrationTest::testReplication` test case, which is done for a few reasons: - Every new case added to the `MirrorConnectorsIntegrationBaseTest` is run by that class and all of its subclasses (of which there are currently five), which can lead to blot in testing runtime - Graceful shutdown of MM2 tasks may not always occur in a timely fashion, which can cause them to emit stale offsets after they are supposed to have been stopped; running with exactly-once support eliminates this risk - Some subclasses of the `MirrorConnectorsIntegrationBaseTest` (specifically, the `MirrorConnectorsIntegrationTransactionsTest` suite) cause unpredictable behavior with offsets that can make it harder to verify an exact number of records in a replicated topic due to, e.g., control records Still, this is a little hacky. Happy to change things up if we think it's worth the work to find something cleaner. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph opened a new pull request, #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache
kamalcph opened a new pull request, #14004: URL: https://github.com/apache/kafka/pull/14004 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager
kamalcph commented on code in PR #13837: URL: https://github.com/apache/kafka/pull/13837#discussion_r1261602111 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java: ## @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.lang.String.format; +import static java.nio.file.Files.newInputStream; +import static java.nio.file.StandardOpenOption.READ; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory; + +/** + * An implementation of {@link RemoteStorageManager} which relies on the local file system to store + * offloaded log segments and associated data. + * + * Due to the consistency semantic of POSIX-compliant file systems, this remote storage provides strong + * read-after-write consistency and a segment's data can be accessed once the copy to the storage succeeded. + * + * + * In order to guarantee isolation, independence, reproducibility and consistency of unit and integration + * tests, the scope of a storage implemented by this class, and identified via the storage
[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager
kamalcph commented on code in PR #13837: URL: https://github.com/apache/kafka/pull/13837#discussion_r1261600532 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageHistory.java: ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static java.util.Arrays.stream; +import static java.util.Collections.unmodifiableMap; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Accumulates and retains the interactions between brokers and {@link LocalTieredStorage} instances. + * These interactions are modelled via events of type {@link LocalTieredStorageEvent}. + * + * Events from an instance of storage are captured by the {@link LocalTieredStorageHistory} after + * {@link LocalTieredStorageHistory#listenTo(LocalTieredStorage)} is called. + */ +/* @ThreadSafe */ +public final class LocalTieredStorageHistory { Review Comment: yes, you're right. History is maintained only in memory and to assert the interaction with the `RemoteStorageManager`. No plans made so far to make it durable. This framework is mainly used to assert the functionality end-to-end instead of stress-test where there can be a huge number of events for a single test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager
kamalcph commented on code in PR #13837: URL: https://github.com/apache/kafka/pull/13837#discussion_r1261592637 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java: ## @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.lang.String.format; +import static java.nio.file.Files.newInputStream; +import static java.nio.file.StandardOpenOption.READ; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory; + +/** + * An implementation of {@link RemoteStorageManager} which relies on the local file system to store + * offloaded log segments and associated data. + * + * Due to the consistency semantic of POSIX-compliant file systems, this remote storage provides strong + * read-after-write consistency and a segment's data can be accessed once the copy to the storage succeeded. + * + * + * In order to guarantee isolation, independence, reproducibility and consistency of unit and integration + * tests, the scope of a storage implemented by this class, and identified via the storage
[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager
kamalcph commented on code in PR #13837: URL: https://github.com/apache/kafka/pull/13837#discussion_r1261590988 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java: ## @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.lang.String.format; +import static java.nio.file.Files.newInputStream; +import static java.nio.file.StandardOpenOption.READ; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory; + +/** + * An implementation of {@link RemoteStorageManager} which relies on the local file system to store + * offloaded log segments and associated data. + * + * Due to the consistency semantic of POSIX-compliant file systems, this remote storage provides strong + * read-after-write consistency and a segment's data can be accessed once the copy to the storage succeeded. + * + * + * In order to guarantee isolation, independence, reproducibility and consistency of unit and integration + * tests, the scope of a storage implemented by this class, and identified via the storage
[jira] [Updated] (KAFKA-14993) Improve TransactionIndex instance handling while copying to and fetching from RSM.
[ https://issues.apache.org/jira/browse/KAFKA-14993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-14993: - Description: RSM should throw a ResourceNotFoundException if it does not have TransactionIndex. Currently, it expects an empty InputStream and creates an unnecessary file in the cache. This can be avoided by catching ResourceNotFoundException and not creating an instance. There are minor cleanups needed in RemoteIndexCache and other TransactionIndex usages. Also, update the LocalTieredStorage, see [this|https://github.com/apache/kafka/pull/13837#discussion_r1258917584] comment. was:RSM should throw a ResourceNotFoundException if it does not have TransactionIndex. Currently, it expects an empty InputStream and creates an unnecessary file in the cache. This can be avoided by catching ResourceNotFoundException and not creating an instance. There are minor cleanups needed in RemoteIndexCache and other TransactionIndex usages. > Improve TransactionIndex instance handling while copying to and fetching from > RSM. > -- > > Key: KAFKA-14993 > URL: https://issues.apache.org/jira/browse/KAFKA-14993 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Assignee: Kamal Chandraprakash >Priority: Major > > RSM should throw a ResourceNotFoundException if it does not have > TransactionIndex. Currently, it expects an empty InputStream and creates an > unnecessary file in the cache. This can be avoided by catching > ResourceNotFoundException and not creating an instance. There are minor > cleanups needed in RemoteIndexCache and other TransactionIndex usages. > Also, update the LocalTieredStorage, see > [this|https://github.com/apache/kafka/pull/13837#discussion_r1258917584] > comment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager
kamalcph commented on code in PR #13837: URL: https://github.com/apache/kafka/pull/13837#discussion_r1261588861 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java: ## @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.lang.String.format; +import static java.nio.file.Files.newInputStream; +import static java.nio.file.StandardOpenOption.READ; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory; + +/** + * An implementation of {@link RemoteStorageManager} which relies on the local file system to store + * offloaded log segments and associated data. + * + * Due to the consistency semantic of POSIX-compliant file systems, this remote storage provides strong + * read-after-write consistency and a segment's data can be accessed once the copy to the storage succeeded. + * + * + * In order to guarantee isolation, independence, reproducibility and consistency of unit and integration + * tests, the scope of a storage implemented by this class, and identified via the storage
[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager
kamalcph commented on code in PR #13837: URL: https://github.com/apache/kafka/pull/13837#discussion_r1261587596 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java: ## @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.lang.String.format; +import static java.nio.file.Files.newInputStream; +import static java.nio.file.StandardOpenOption.READ; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory; + +/** + * An implementation of {@link RemoteStorageManager} which relies on the local file system to store + * offloaded log segments and associated data. + * + * Due to the consistency semantic of POSIX-compliant file systems, this remote storage provides strong + * read-after-write consistency and a segment's data can be accessed once the copy to the storage succeeded. + * + * + * In order to guarantee isolation, independence, reproducibility and consistency of unit and integration + * tests, the scope of a storage implemented by this class, and identified via the storage
[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer
jolshan commented on code in PR #13991: URL: https://github.com/apache/kafka/pull/13991#discussion_r1261578373 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/util/SystemTimerReaperTest.java: ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.util; + +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.util.timer.SystemTimer; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CompletableFuture; + +public class SystemTimerReaperTest { +private static class FutureTimerTask extends TimerTask { +CompletableFuture future = new CompletableFuture<>(); + +FutureTimerTask(long delayMs) { +super(delayMs); +} + +@Override +public void run() { +// We use org.apache.kafka.common.errors.TimeoutException to differentiate +// from java.util.concurrent.TimeoutException. +future.completeExceptionally(new TimeoutException( +String.format("Future failed to be completed before timeout of %sMs ms was reached", delayMs))); +} +} + +private CompletableFuture add(Timer timer, long delayMs) { +FutureTimerTask task = new FutureTimerTask<>(delayMs); +timer.add(task); +return task.future; +} + +@Test +public void testReaper() throws Exception { +Timer timer = new SystemTimerReaper("reaper", new SystemTimer("timer")); +try { +CompletableFuture t1 = add(timer, 100L); +CompletableFuture t2 = add(timer, 200L); +CompletableFuture t3 = add(timer, 300L); +TestUtils.assertFutureThrows(t1, TimeoutException.class); Review Comment: I'm a little confused at how we ensure that the given amount of time has passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager
kamalcph commented on code in PR #13837: URL: https://github.com/apache/kafka/pull/13837#discussion_r1261578158 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java: ## @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.lang.String.format; +import static java.nio.file.Files.newInputStream; +import static java.nio.file.StandardOpenOption.READ; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory; + +/** + * An implementation of {@link RemoteStorageManager} which relies on the local file system to store + * offloaded log segments and associated data. + * + * Due to the consistency semantic of POSIX-compliant file systems, this remote storage provides strong + * read-after-write consistency and a segment's data can be accessed once the copy to the storage succeeded. + * + * + * In order to guarantee isolation, independence, reproducibility and consistency of unit and integration + * tests, the scope of a storage implemented by this class, and identified via the storage
[jira] [Created] (KAFKA-15183) Add more controller, loader, snapshot emitter metrics
Colin McCabe created KAFKA-15183: Summary: Add more controller, loader, snapshot emitter metrics Key: KAFKA-15183 URL: https://issues.apache.org/jira/browse/KAFKA-15183 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe Add the controller, loader, and snapshot emitter metrics from KIP-938. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager
kamalcph commented on code in PR #13837: URL: https://github.com/apache/kafka/pull/13837#discussion_r1261567166 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java: ## @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.lang.String.format; +import static java.nio.file.Files.newInputStream; +import static java.nio.file.StandardOpenOption.READ; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory; + +/** + * An implementation of {@link RemoteStorageManager} which relies on the local file system to store + * offloaded log segments and associated data. + * + * Due to the consistency semantic of POSIX-compliant file systems, this remote storage provides strong + * read-after-write consistency and a segment's data can be accessed once the copy to the storage succeeded. + * + * + * In order to guarantee isolation, independence, reproducibility and consistency of unit and integration + * tests, the scope of a storage implemented by this class, and identified via the storage
[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer
jolshan commented on code in PR #13991: URL: https://github.com/apache/kafka/pull/13991#discussion_r1261559993 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2360,6 +2360,19 @@ public void testGroupIdsByTopics() { assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar")); } +@Test +public void testOnNewMetadataImageWithEmptyDelta() { Review Comment: This is the test for the ofNullable change? Do we also need to check we don't notify any groups? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager
kamalcph commented on code in PR #13837: URL: https://github.com/apache/kafka/pull/13837#discussion_r1261560074 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java: ## @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.lang.String.format; +import static java.nio.file.Files.newInputStream; +import static java.nio.file.StandardOpenOption.READ; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory; + +/** + * An implementation of {@link RemoteStorageManager} which relies on the local file system to store + * offloaded log segments and associated data. + * + * Due to the consistency semantic of POSIX-compliant file systems, this remote storage provides strong + * read-after-write consistency and a segment's data can be accessed once the copy to the storage succeeded. + * + * + * In order to guarantee isolation, independence, reproducibility and consistency of unit and integration + * tests, the scope of a storage implemented by this class, and identified via the storage
[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer
jolshan commented on code in PR #13991: URL: https://github.com/apache/kafka/pull/13991#discussion_r1261559993 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2360,6 +2360,19 @@ public void testGroupIdsByTopics() { assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar")); } +@Test +public void testOnNewMetadataImageWithEmptyDelta() { Review Comment: This is the test for the ofNullable change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer
jolshan commented on code in PR #13991: URL: https://github.com/apache/kafka/pull/13991#discussion_r1261554932 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.util; + +import org.apache.kafka.server.util.ShutdownableThread; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; + +/** + * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread + * to expire the tasks in the {@link Timer}. + */ +public class SystemTimerReaper implements Timer { +private static final long WORK_TIMEOUT_MS = 200L; + +class Reaper extends ShutdownableThread { +Reaper(String name) { +super(name, false); +} + +@Override +public void doWork() { +try { +timer.advanceClock(WORK_TIMEOUT_MS); Review Comment: How did we decide upon 200? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer
jolshan commented on code in PR #13991: URL: https://github.com/apache/kafka/pull/13991#discussion_r1261554932 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.util; + +import org.apache.kafka.server.util.ShutdownableThread; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; + +/** + * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread + * to expire the tasks in the {@link Timer}. + */ +public class SystemTimerReaper implements Timer { +private static final long WORK_TIMEOUT_MS = 200L; + +class Reaper extends ShutdownableThread { +Reaper(String name) { +super(name, false); +} + +@Override +public void doWork() { +try { +timer.advanceClock(WORK_TIMEOUT_MS); Review Comment: How did we decide upon 200? Is my understanding correct that we only execute the event if it expiration is within 200 ms of the current time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.
[ https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742508#comment-17742508 ] Kamal Chandraprakash commented on KAFKA-14912: -- I'm not working on this one currently. We can reassign it. > Introduce a configuration for remote index cache size, preferably a dynamic > config. > --- > > Key: KAFKA-14912 > URL: https://issues.apache.org/jira/browse/KAFKA-14912 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Assignee: Kamal Chandraprakash >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on pull request #13997: KAFKA-15180: Generalize integration tests to change use of KafkaConsumer to Consumer
divijvaidya commented on PR #13997: URL: https://github.com/apache/kafka/pull/13997#issuecomment-1632968446 Unrelated test failures: ``` [Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13997/2/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationBaseTest/Build___JDK_11_and_Scala_2_13___testReplication__/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13997/2/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_11_and_Scala_2_13___testBalancePartitionLeaders__/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13997/2/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_11_and_Scala_2_13___testBalancePartitionLeaders___2/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13997/2/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_11_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/) [Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplicateSourceDefault()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13997/2/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationBaseTest/Build___JDK_17_and_Scala_2_13___testReplicateSourceDefault__/) [Build / JDK 20 and Scala 2.13 / org.apache.kafka.streams.integration.AdjustStreamThreadCountTest.testConcurrentlyAccessThreads()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13997/2/testReport/junit/org.apache.kafka.streams.integration/AdjustStreamThreadCountTest/Build___JDK_20_and_Scala_2_13___testConcurrentlyAccessThreads__/) [Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13997/2/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders__/) [Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13997/2/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders___2/) ``` I will leave this PR around for some time in case someone wants to add additional comments. Will merge after 2-3 days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer
jolshan commented on code in PR #13991: URL: https://github.com/apache/kafka/pull/13991#discussion_r1261521364 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.util; + +import org.apache.kafka.server.util.ShutdownableThread; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; + +/** + * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread + * to expire the tasks in the {@link Timer}. + */ +public class SystemTimerReaper implements Timer { Review Comment: We needed this because there wasn't a thread actually running the timer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1261515978 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; +} + +if (!offset.containsKey(POSITION_FIELD)) { +throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); +} + +// The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value +try { +long offsetPosition = Long.parseLong(String.valueOf(offset.get(POSITION_FIELD))); Review Comment: Since the type alignment issue seemed like a broader one (i.e. not scoped to the file connectors being touched here), I've created a separate [ticket](https://issues.apache.org/jira/browse/KAFKA-15182) and [PR](https://github.com/apache/kafka/pull/14003) for it. > it doesn't seem like a great endorsement of our API if we have to implement workarounds in the file connectors, which are the first example of the connector API that many developers see. I'd argue that it isn't really a workaround and that the current check itself is bad. If the (de)serialization happened to use `Integer` for values that fit in a 32 bit signed type (which would be perfectly valid and is exactly what we see currently before the values are passed through the converter), the current check in the `FileStreamSourceTask` would cause it to bail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya opened a new pull request, #14003: KAFKA-15182: Normalize source connector offsets before invoking SourceConnector::alterOffsets
yashmayya opened a new pull request, #14003: URL: https://github.com/apache/kafka/pull/14003 - From https://issues.apache.org/jira/browse/KAFKA-15182: >See discussion [here](https://github.com/apache/kafka/pull/13945#discussion_r1260946148) > > TLDR: When users attempt to externally modify source connector offsets via the `PATCH /offsets` endpoint (introduced in [KIP-875](https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect)), type mismatches can occur between offsets passed to `SourceConnector::alterOffsets` and the offsets that are retrieved by connectors / tasks via an instance of `OffsetStorageReader` after the offsets have been modified. In order to prevent this type mismatch that could lead to subtle bugs in connectors, we could serialize + deserialize the offsets using the worker's internal JSON converter before invoking `SourceConnector::alterOffsets`. - I've also added a small unit test, verified that the existing offsets API related integration tests are passing, and tested this patch out manually with the `FileStreamSourceConnector`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15182) Normalize offsets before invoking SourceConnector::alterOffsets
Yash Mayya created KAFKA-15182: -- Summary: Normalize offsets before invoking SourceConnector::alterOffsets Key: KAFKA-15182 URL: https://issues.apache.org/jira/browse/KAFKA-15182 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Yash Mayya Assignee: Yash Mayya See discussion [here|https://github.com/apache/kafka/pull/13945#discussion_r1260946148] TLDR: When users attempt to externally modify source connector offsets via the {{PATCH /offsets}} endpoint (introduced in [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]), type mismatches can occur between offsets passed to {{SourceConnector::alterOffsets}} and the offsets that are retrieved by connectors / tasks via an instance of {{OffsetStorageReader }}after the offsets have been modified. In order to prevent this type mismatch that could lead to subtle bugs in connectors, we could serialize + deserialize the offsets using the worker's internal JSON converter before invoking {{{}SourceConnector::alterOffsets{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on pull request #13853: KAFKA-15088: Fixing Incorrect Reference Usage in Connector State Changes
gharris1727 commented on PR #13853: URL: https://github.com/apache/kafka/pull/13853#issuecomment-1632939211 Hey @daehokimm Thanks for the PR! I think that there is some semantic difference between `AbstractStatus.State` and `ConnectorStatus.State`, and certainly between `ConnectorStatus.State` and `TaskStatus.State`. Unfortunately Java doesn't seem to think so: there's no difference in the type-checker to separate these different types. Without the type-checker to enforce this, I don't think we're going to catch all of the mistakes that already exist, or prevent new ones from being added. What do you think about refactoring this AbstractStatus.State into distinct enums? It would help us to find all of the other places that this typo may exist. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest
philipnee commented on PR #13956: URL: https://github.com/apache/kafka/pull/13956#issuecomment-1632936434 Hey @divijvaidya - I actually tried to fix this probably a year ago, and my guess was some of the consumer didn't close cleanly and caused leak. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer
jolshan commented on code in PR #13991: URL: https://github.com/apache/kafka/pull/13991#discussion_r1261496901 ## core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala: ## @@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data) } - @ClusterTest(serverProperties = Array( + @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array( Review Comment: I guess we will do many of these test changes in a followup though? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer
jolshan commented on code in PR #13991: URL: https://github.com/apache/kafka/pull/13991#discussion_r1261492024 ## core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala: ## @@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data) } - @ClusterTest(serverProperties = Array( + @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array( Review Comment: Hmmm does this also mean that we are no longer testing the old group coordinator (at least on this test) EDIT: I see the name of the test now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer
jolshan commented on code in PR #13991: URL: https://github.com/apache/kafka/pull/13991#discussion_r1261492024 ## core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala: ## @@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data) } - @ClusterTest(serverProperties = Array( + @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array( Review Comment: Hmmm does this also mean that we are no longer testing the old group coordinator (at least on this test) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer
jolshan commented on code in PR #13991: URL: https://github.com/apache/kafka/pull/13991#discussion_r1261475273 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -175,7 +176,7 @@ object Defaults { val ConsumerGroupMinHeartbeatIntervalMs = 5000 val ConsumerGroupMaxHeartbeatIntervalMs = 15000 val ConsumerGroupMaxSize = Int.MaxValue - val ConsumerGroupAssignors = "" + val ConsumerGroupAssignors = List(classOf[RangeAssignor].getName).asJava Review Comment: Did we need to do this because the default couldn't be empty? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils
vamossagar12 commented on PR #13158: URL: https://github.com/apache/kafka/pull/13158#issuecomment-1632883931 > I took a look at the changes and they look fine. If I understand correctly we're moving these classes to server-common while we have to keep MirrorMaker. Then in Kafka 4.0 (and assuming [KAFKA-14525](https://issues.apache.org/jira/browse/KAFKA-14525) is complete) we will be able to move all these classes to tools. Is that right? Thanks @mimaison , yes that's the idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1261444204 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ## @@ -3057,6 +3057,56 @@ public void testSenderShouldRetryWithBackoffOnRetriableError() { assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2); } +@Test +public void testReceiveFailedBatchTwiceWithTransactions() throws Exception { +ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); +apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3)); +TransactionManager txnManager = new TransactionManager(logContext, "testFailTwice", 6, 100, apiVersions); + +setupWithTransactionState(txnManager); +doInitTransactions(txnManager, producerIdAndEpoch); + +txnManager.beginTransaction(); +txnManager.maybeAddPartition(tp0); +client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE))); +sender.runOnce(); + +// Send first ProduceRequest +Future request1 = appendToAccumulator(tp0); +sender.runOnce(); // send request + +Node node = metadata.fetch().nodes().get(0); +time.sleep(2000L); +client.disconnect(node.idString(), true); +client.backoff(node, 10); + +sender.runOnce(); // now expire the batch. +assertFutureFailure(request1, TimeoutException.class); + +time.sleep(20); + +sendIdempotentProducerResponse(0, tp0, Errors.INVALID_RECORD, -1); +sender.runOnce(); // receive late response + +// Loop once and confirm that the transaction manager does not enter a fatal error state +sender.runOnce(); +assertTrue(txnManager.hasAbortableError()); +TransactionalRequestResult result = txnManager.beginAbort(); +sender.runOnce(); + +respondToEndTxn(Errors.NONE); +sender.runOnce(); +assertTrue(txnManager::isInitializing); Review Comment: I'm following conventions from the rest of the file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1261443466 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ## @@ -3057,6 +3057,56 @@ public void testSenderShouldRetryWithBackoffOnRetriableError() { assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2); } +@Test +public void testReceiveFailedBatchTwiceWithTransactions() throws Exception { +ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); +apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3)); +TransactionManager txnManager = new TransactionManager(logContext, "testFailTwice", 6, 100, apiVersions); + +setupWithTransactionState(txnManager); +doInitTransactions(txnManager, producerIdAndEpoch); + +txnManager.beginTransaction(); +txnManager.maybeAddPartition(tp0); +client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE))); +sender.runOnce(); + +// Send first ProduceRequest +Future request1 = appendToAccumulator(tp0); +sender.runOnce(); // send request + +Node node = metadata.fetch().nodes().get(0); +time.sleep(2000L); +client.disconnect(node.idString(), true); +client.backoff(node, 10); + +sender.runOnce(); // now expire the batch. Review Comment: I think it is both? This wording is the same as the other tests in the file that expire batches. (There are 4 others in the file). If i remove one or the other the test doesn't work correctly. Without the disconnect, we are not really able to receive two responses since I modified the disconnect code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
C0urante commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1261413255 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; +} + +if (!offset.containsKey(POSITION_FIELD)) { +throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); +} + +// The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value +try { +long offsetPosition = Long.parseLong(String.valueOf(offset.get(POSITION_FIELD))); Review Comment: Ah, nice catch! I noticed the discrepancy in numeric types while working on [KAFKA-15177](https://issues.apache.org/jira/browse/KAFKA-15177) but hadn't even considered the possibility of aligning the types across invocations of `alterOffsets` and `OffsetStorageReader::offset`/`OffsetStorageReader::offsets`. I think re-deserializing the offsets before passing them to `alterOffsets` is a great idea. Unless the request body is gigantic there shouldn't be serious performance concerns, and it also acts as a nice preflight check to ensure that the offsets can be successfully propagated to the connector's tasks through the offsets topic. I still don't love permitting string types for the connector's `position` offset values--it doesn't seem like a great endorsement of our API if we have to implement workarounds in the file connectors, which are the first example of the connector API that many developers see. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nkostoulas opened a new pull request, #14002: Move throttling rate to KafkaConfig
nkostoulas opened a new pull request, #14002: URL: https://github.com/apache/kafka/pull/14002 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14938) Flaky test org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary
[ https://issues.apache.org/jira/browse/KAFKA-14938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14938: -- Fix Version/s: 3.5.2 > Flaky test > org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary > -- > > Key: KAFKA-14938 > URL: https://issues.apache.org/jira/browse/KAFKA-14938 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sambhav Jain >Priority: Major > Fix For: 3.6.0, 3.4.2, 3.5.2 > > > Test seems to be failing with > ``` > ava.lang.AssertionError: Not enough records produced by source connector. > Expected at least: 100 + but got 72 > h4. Stacktrace > java.lang.AssertionError: Not enough records produced by source connector. > Expected at least: 100 + but got 72 > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:421) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60) > at > org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56) > at >
[jira] [Updated] (KAFKA-14938) Flaky test org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary
[ https://issues.apache.org/jira/browse/KAFKA-14938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14938: -- Fix Version/s: 3.4.2 > Flaky test > org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary > -- > > Key: KAFKA-14938 > URL: https://issues.apache.org/jira/browse/KAFKA-14938 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sambhav Jain >Priority: Major > Fix For: 3.6.0, 3.4.2 > > > Test seems to be failing with > ``` > ava.lang.AssertionError: Not enough records produced by source connector. > Expected at least: 100 + but got 72 > h4. Stacktrace > java.lang.AssertionError: Not enough records produced by source connector. > Expected at least: 100 + but got 72 > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:421) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60) > at > org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56) > at >
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1261385072 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ## @@ -3057,6 +3057,56 @@ public void testSenderShouldRetryWithBackoffOnRetriableError() { assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2); } +@Test +public void testReceiveFailedBatchTwiceWithTransactions() throws Exception { +ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); +apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3)); +TransactionManager txnManager = new TransactionManager(logContext, "testFailTwice", 6, 100, apiVersions); + +setupWithTransactionState(txnManager); +doInitTransactions(txnManager, producerIdAndEpoch); + +txnManager.beginTransaction(); +txnManager.maybeAddPartition(tp0); +client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE))); +sender.runOnce(); + +// Send first ProduceRequest +Future request1 = appendToAccumulator(tp0); +sender.runOnce(); // send request + +Node node = metadata.fetch().nodes().get(0); +time.sleep(2000L); Review Comment: If we don't sleep, then the future does not complete. There is another test in the file that explains this only expires the batch if it hits the delivery timeout. Note in the example, time.sleep(1000) was already called. ``` // We add 600 millis to expire the first batch but not the second. // Note deliveryTimeoutMs is 1500. time.sleep(600L); client.disconnect(node.idString()); client.backoff(node, 10); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1261385072 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ## @@ -3057,6 +3057,56 @@ public void testSenderShouldRetryWithBackoffOnRetriableError() { assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2); } +@Test +public void testReceiveFailedBatchTwiceWithTransactions() throws Exception { +ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); +apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3)); +TransactionManager txnManager = new TransactionManager(logContext, "testFailTwice", 6, 100, apiVersions); + +setupWithTransactionState(txnManager); +doInitTransactions(txnManager, producerIdAndEpoch); + +txnManager.beginTransaction(); +txnManager.maybeAddPartition(tp0); +client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE))); +sender.runOnce(); + +// Send first ProduceRequest +Future request1 = appendToAccumulator(tp0); +sender.runOnce(); // send request + +Node node = metadata.fetch().nodes().get(0); +time.sleep(2000L); Review Comment: If we don't sleep, then the future does not complete. There is another test in the file that explains this only expires the batch if it hits the delivery timeout ``` // We add 600 millis to expire the first batch but not the second. // Note deliveryTimeoutMs is 1500. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #13975: KAFKA-15161: Fix InvalidReplicationFactorException at connect startup
viktorsomogyi commented on code in PR #13975: URL: https://github.com/apache/kafka/pull/13975#discussion_r1261378645 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -174,14 +174,25 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w errorUnavailableEndpoints: Boolean = false, errorUnavailableListeners: Boolean = false): Seq[MetadataResponseTopic] = { val image = _currentImage -topics.toSeq.flatMap { topic => - getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata => +if (!isInitialized()) { + topics.toSeq.map(topic => Review Comment: In case of Zookeeper-Kafka I think this would be a good solution. With KRaft it seems though that it already returns `UNKNOWN_TOPIC_OR_PARTITION` error so the fix may not be needed there. I'll do some more digging tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #13975: KAFKA-15161: Fix InvalidReplicationFactorException at connect startup
viktorsomogyi commented on code in PR #13975: URL: https://github.com/apache/kafka/pull/13975#discussion_r1261378645 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -174,14 +174,25 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w errorUnavailableEndpoints: Boolean = false, errorUnavailableListeners: Boolean = false): Seq[MetadataResponseTopic] = { val image = _currentImage -topics.toSeq.flatMap { topic => - getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata => +if (!isInitialized()) { + topics.toSeq.map(topic => Review Comment: In case of Zookeeper-Kafka I think this would be a good solution. With KRaft it seems though that it already returns `UNKNOWN_TOPIC_OR_PARTITION` error so the fix may not needed there. I'll do some more digging tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary
C0urante merged PR #13646: URL: https://github.com/apache/kafka/pull/13646 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14938) Flaky test org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary
[ https://issues.apache.org/jira/browse/KAFKA-14938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14938: -- Fix Version/s: 3.6.0 > Flaky test > org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary > -- > > Key: KAFKA-14938 > URL: https://issues.apache.org/jira/browse/KAFKA-14938 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sambhav Jain >Priority: Major > Fix For: 3.6.0 > > > Test seems to be failing with > ``` > ava.lang.AssertionError: Not enough records produced by source connector. > Expected at least: 100 + but got 72 > h4. Stacktrace > java.lang.AssertionError: Not enough records produced by source connector. > Expected at least: 100 + but got 72 > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:421) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60) > at > org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56) > at >
[GitHub] [kafka] nicktelford commented on a diff in pull request #13993: KAFKA-15178: Improve ConsumerCoordinator.poll perf
nicktelford commented on code in PR #13993: URL: https://github.com/apache/kafka/pull/13993#discussion_r1261346057 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -119,6 +119,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private Set joinedSubscription; private MetadataSnapshot metadataSnapshot; private MetadataSnapshot assignmentSnapshot; +private boolean metadataUpdated; Review Comment: Note: in 633f0e7 I switched to using a `LinkedHashSet`, because it performs better for iteration (which is used during `equals`) than `HashSet`, at the cost of a little more memory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru opened a new pull request, #14001: Kafka Streams Threading: Exception handling
lucasbru opened a new pull request, #14001: URL: https://github.com/apache/kafka/pull/14001 Implements punctuation inside processing threads. The scheduler algorithm checks if a task that is not assigned currently can be punctuated, and returns it when a worker thread asks for the next task to be processed. Then, the processing thread runs all punctuations in the punctionation queue. Piggy-backed: take TaskExecutionMetadata into account when processing records. This is a stacked PR, only the last commit needs to be reviewed ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest
divijvaidya commented on code in PR #13956: URL: https://github.com/apache/kafka/pull/13956#discussion_r1261325930 ## core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala: ## @@ -346,6 +346,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest { partitionsToAssign: Set[TopicPartition], userRebalanceListener: ConsumerRebalanceListener) extends ShutdownableThread("daemon-consumer-assignment", false) { +setDaemon(true) Review Comment: I added this as a fail safe but yes, it's not required if we close them properly. I have reverted them back in the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils
fvaleri commented on PR #13158: URL: https://github.com/apache/kafka/pull/13158#issuecomment-1632708004 Yes, once the MirrorMaker1 dependency will be gone in 4.0.0, we can move them to tools. I added a note in [KAFKA-14705](https://issues.apache.org/jira/browse/KAFKA-14705) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14705) Tools cleanup for the next major release
[ https://issues.apache.org/jira/browse/KAFKA-14705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri updated KAFKA-14705: Description: We can use this task to track tools cleanup for the next major release (4.0.0). 1. Redirections to be removed: - core/src/main/scala/kafka/tools/JmxTool - core/src/main/scala/kafka/tools/ClusterTool - core/src/main/scala/kafka/tools/StateChangeLogMerger - core/src/main/scala/kafka/tools/EndToEndLatency - core/src/main/scala/kafka/admin/FeatureCommand - core/src/main/scala/kafka/tools/StreamsResetter 2. Deprecated tools to be removed: - tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger 3. TopicFilter, PartitionFilter and TopicPartitionFilter in "server-common" should be moved to "tools" once we get rid of MirrorMaker1 dependency. 4. We should also get rid of many deprecated options across all tools, including not migrated tools. was: We can use this task to track tools cleanup for the next major release (4.0.0). As part of this activity, we should also get rid of many deprecated options across all tools, including not migrated tools. Redirections to be removed: - core/src/main/scala/kafka/tools/JmxTool - core/src/main/scala/kafka/tools/ClusterTool - core/src/main/scala/kafka/tools/StateChangeLogMerger - core/src/main/scala/kafka/tools/EndToEndLatency - core/src/main/scala/kafka/admin/FeatureCommand - core/src/main/scala/kafka/tools/StreamsResetter Deprecated tools to be removed: - tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger TopicFilter, PartitionFilter and TopicPartitionFilter in "server-common" should be moved to "tools" once we get rid of MirrorMaker1 dependency. > Tools cleanup for the next major release > > > Key: KAFKA-14705 > URL: https://issues.apache.org/jira/browse/KAFKA-14705 > Project: Kafka > Issue Type: Sub-task >Reporter: Federico Valeri >Priority: Major > Fix For: 4.0.0 > > > We can use this task to track tools cleanup for the next major release > (4.0.0). > 1. Redirections to be removed: > - core/src/main/scala/kafka/tools/JmxTool > - core/src/main/scala/kafka/tools/ClusterTool > - core/src/main/scala/kafka/tools/StateChangeLogMerger > - core/src/main/scala/kafka/tools/EndToEndLatency > - core/src/main/scala/kafka/admin/FeatureCommand > - core/src/main/scala/kafka/tools/StreamsResetter > 2. Deprecated tools to be removed: > - tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger > 3. TopicFilter, PartitionFilter and TopicPartitionFilter in "server-common" > should be moved to "tools" once we get rid of MirrorMaker1 dependency. > 4. We should also get rid of many deprecated options across all tools, > including not migrated tools. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] nicktelford commented on a diff in pull request #13993: KAFKA-15178: Improve ConsumerCoordinator.poll perf
nicktelford commented on code in PR #13993: URL: https://github.com/apache/kafka/pull/13993#discussion_r1261318387 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -119,6 +119,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private Set joinedSubscription; private MetadataSnapshot metadataSnapshot; private MetadataSnapshot assignmentSnapshot; +private boolean metadataUpdated; Review Comment: I've revised the implementation to instead attempt to optimize the comparison itself in 63ee9ab, but I'm not entirely sure that it will perform better. I'd like to construct a micro-benchmark for it, but I honestly have no idea how I'd run a micro-benchmark on this part of the codebase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14705) Tools cleanup for the next major release
[ https://issues.apache.org/jira/browse/KAFKA-14705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri updated KAFKA-14705: Description: We can use this task to track tools cleanup for the next major release (4.0.0). As part of this activity, we should also get rid of many deprecated options across all tools, including not migrated tools. Redirections to be removed: - core/src/main/scala/kafka/tools/JmxTool - core/src/main/scala/kafka/tools/ClusterTool - core/src/main/scala/kafka/tools/StateChangeLogMerger - core/src/main/scala/kafka/tools/EndToEndLatency - core/src/main/scala/kafka/admin/FeatureCommand - core/src/main/scala/kafka/tools/StreamsResetter Deprecated tools to be removed: - tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger TopicFilter, PartitionFilter and TopicPartitionFilter in "server-common" should be moved to "tools" once we get rid of MirrorMaker1 dependency. was: We can use this task to track tools cleanup for the next major release (4.0.0). As part of this activity, we should also get rid of many deprecated options across all tools, including not migrated tools. Redirections to be removed: - core/src/main/scala/kafka/tools/JmxTool - core/src/main/scala/kafka/tools/ClusterTool - core/src/main/scala/kafka/tools/StateChangeLogMerger - core/src/main/scala/kafka/tools/EndToEndLatency - core/src/main/scala/kafka/admin/FeatureCommand - core/src/main/scala/kafka/tools/StreamsResetter Deprecated tools to be removed: - tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger > Tools cleanup for the next major release > > > Key: KAFKA-14705 > URL: https://issues.apache.org/jira/browse/KAFKA-14705 > Project: Kafka > Issue Type: Sub-task >Reporter: Federico Valeri >Priority: Major > Fix For: 4.0.0 > > > We can use this task to track tools cleanup for the next major release > (4.0.0). > As part of this activity, we should also get rid of many deprecated options > across all tools, including not migrated tools. > Redirections to be removed: > - core/src/main/scala/kafka/tools/JmxTool > - core/src/main/scala/kafka/tools/ClusterTool > - core/src/main/scala/kafka/tools/StateChangeLogMerger > - core/src/main/scala/kafka/tools/EndToEndLatency > - core/src/main/scala/kafka/admin/FeatureCommand > - core/src/main/scala/kafka/tools/StreamsResetter > Deprecated tools to be removed: > - tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger > TopicFilter, PartitionFilter and TopicPartitionFilter in "server-common" > should be moved to "tools" once we get rid of MirrorMaker1 dependency. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils
mimaison commented on PR #13158: URL: https://github.com/apache/kafka/pull/13158#issuecomment-1632638912 I took a look at the changes and they look fine. If I understand correctly we're moving these classes to server-common while we have to keep MirrorMaker. Then in Kafka 4.0 (and assuming [KAFKA-14525](https://issues.apache.org/jira/browse/KAFKA-14525) is complete) we will be able to move all these classes to tools. Is that right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #13988: [KAFKA-15137] Do not log entire request payload in KRaftControllerChannelManager
mumrah commented on PR #13988: URL: https://github.com/apache/kafka/pull/13988#issuecomment-1632618714 Thanks @divijvaidya! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest
divijvaidya commented on PR #13956: URL: https://github.com/apache/kafka/pull/13956#issuecomment-1632593879 >Did you verify the thread leak by checking the threat dump? No, I found the leak from a very useful line in the tests I added recently. It prints the names of the leaked thread, `Found 3 unexpected threads during @AfterAll: `controller-event-thread,daemon-bounce-broker-EventThread,Test worker-EventThread` ==> expected: but was: ` @philipnee I wanted your expert opinion on the possible reason for failure of `ConsumerBounceTest > testConsumptionWithBrokerFailures`. See the stack trace in the description. Is there anything that pops in your head that could make it less flaky (given where it fails in the stacktrace). Asking because, this failure was the cause of mayhem which leaked threads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15181) Race condition on partition assigned to TopicBasedRemoteLogMetadataManager
Jorge Esteban Quilcate Otoya created KAFKA-15181: Summary: Race condition on partition assigned to TopicBasedRemoteLogMetadataManager Key: KAFKA-15181 URL: https://issues.apache.org/jira/browse/KAFKA-15181 Project: Kafka Issue Type: Bug Components: core Reporter: Jorge Esteban Quilcate Otoya Assignee: Jorge Esteban Quilcate Otoya TopicBasedRemoteLogMetadataManager (TBRLMM) uses a cache to be prepared whever partitions are assigned. When partitions are assigned to the TBRLMM instance, a consumer is started to keep the cache up to date. If the cache hasn't finalized to build, TBRLMM fails to return remote metadata about partitions that are store on the backing topic. TBRLMM may not recover from this failing state. A proposal to fix this issue would be wait after a partition is assigned for the consumer to catch up. A similar logic is used at the moment when TBRLMM writes to the topic, and uses send callback to wait for consumer to catch up. This logic can be reused whever a partition is assigned, so when TBRLMM is marked as initialized, cache is ready to serve requests. Reference: https://github.com/aiven/kafka/issues/33 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] OmniaGM commented on a diff in pull request #13585: KAFKA-14737: Move kafka.utils.json to server-common
OmniaGM commented on code in PR #13585: URL: https://github.com/apache/kafka/pull/13585#discussion_r1261125892 ## server-common/src/main/java/org/apache/kafka/server/util/json/DecodeJson.java: ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.util.json; + +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public interface DecodeJson { +/** + * Decode the JSON node provided into an instance of `T`. + * + * @throws JsonMappingException if `node` cannot be decoded into `T`. + */ +T decode(JsonNode node) throws JsonMappingException; + +static JsonMappingException throwJsonMappingException(String expectedType, JsonNode node) { +return new JsonMappingException(null, String.format("Expected `%s` value, received %s", expectedType, node)); +} + +final class DecodeBoolean implements DecodeJson { +@Override +public Boolean decode(JsonNode node) throws JsonMappingException { +if (node.isBoolean()) { +return node.booleanValue(); +} +throw throwJsonMappingException(Boolean.class.getSimpleName(), node); +} +} + +final class DecodeDouble implements DecodeJson { +@Override +public Double decode(JsonNode node) throws JsonMappingException { +if (node.isDouble() || node.isLong() || node.isInt()) { +return node.doubleValue(); +} +throw throwJsonMappingException(Double.class.getSimpleName(), node); +} +} + +final class DecodeInteger implements DecodeJson { +@Override +public Integer decode(JsonNode node) throws JsonMappingException { +if (node.isInt()) { +return node.intValue(); +} +throw throwJsonMappingException(Integer.class.getSimpleName(), node); +} +} + +final class DecodeLong implements DecodeJson { +@Override +public Long decode(JsonNode node) throws JsonMappingException { +if (node.isLong() || node.isInt()) { +return node.longValue(); +} +throw throwJsonMappingException(Long.class.getSimpleName(), node); +} +} + +final class DecodeString implements DecodeJson { +@Override +public String decode(JsonNode node) throws JsonMappingException { +if (node.isTextual()) { +return node.textValue(); +} +throw throwJsonMappingException(String.class.getSimpleName(), node); +} +} + +static DecodeJson> decodeOptional(DecodeJson decodeJson) { +return node -> { +if (node.isNull()) return Optional.empty(); +return Optional.of(decodeJson.decode(node)); +}; +} + +static DecodeJson> decodeList(DecodeJson decodeJson) { +return node -> { +if (node.isArray()) { +List result = new ArrayList<>(); +Iterator elements = node.elements(); +while (elements.hasNext()) { +try { +result.add(decodeJson.decode(elements.next())); +} catch (JsonMappingException e) { +throw e; Review Comment: Probably not, I was trying to stick to the original interface (which doesn't have any exception in the signature) as much as I could in case we switched everything to use this new class instead. I will remove `catch` and make `decodeList` throw `JsonMappingException` instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13944: KAFKA-14953: Add tiered storage related metrics
divijvaidya commented on code in PR #13944: URL: https://github.com/apache/kafka/pull/13944#discussion_r1261124589 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -277,6 +277,12 @@ class BrokerTopicMetrics(name: Option[String]) { BrokerTopicStats.TotalFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"), BrokerTopicStats.FetchMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"), BrokerTopicStats.ProduceMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"), +BrokerTopicStats.RemoteCopyBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteCopyBytesPerSec, "bytes"), +BrokerTopicStats.RemoteFetchBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteFetchBytesPerSec, "bytes"), +BrokerTopicStats.RemoteReadRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteReadRequestsPerSec, "requests"), +BrokerTopicStats.RemoteWriteRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteWriteRequestsPerSec, "requests"), +BrokerTopicStats.FailedRemoteReadRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteReadRequestsPerSec, "requests"), +BrokerTopicStats.FailedRemoteWriteRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteWriteRequestsPerSec, "requests"), Review Comment: We at (Amazon MSK) have an implementation against the KIP-405 interfaces [1] and we explicitly tell the customers that the metric such as `RemoteBytesInPerSec` are as per KIP-405 contract. If we change the KIP now, it would be a break in contract for our customers. Hence, I would not be in favour of amending the accepted KIP at this time. [1] see: RemoteBytesInPerSec at https://docs.aws.amazon.com/msk/latest/developerguide/metrics-details.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15132) Implement disable & re-enablement for Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-15132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742409#comment-17742409 ] Divij Vaidya commented on KAFKA-15132: -- Update - need more time. new ETA 10th July. > Implement disable & re-enablement for Tiered Storage > > > Key: KAFKA-15132 > URL: https://issues.apache.org/jira/browse/KAFKA-15132 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Divij Vaidya >Assignee: Divij Vaidya >Priority: Major > Labels: kip > > KIP-405 [1] introduces the Tiered Storage feature in Apache Kafka. One of the > limitations mentioned in the KIP is inability to re-enable TS on a topic > after it has been disabled. > {quote}Once tier storage is enabled for a topic, it can not be disabled. We > will add this feature in future versions. One possible workaround is to > create a new topic and copy the data from the desired offset and delete the > old topic. > {quote} > This task will propose a new KIP which extends on KIP-405 to describe the > behaviour on on disablement and re-enablement of tiering storage for a topic. > The solution will apply for both Zk and KRaft variants. > [1] KIP-405 - > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14915) Option to consume multiple partitions that have their data in remote storage for the target offsets.
[ https://issues.apache.org/jira/browse/KAFKA-14915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14915: - Description: Context: https://github.com/apache/kafka/pull/13535#discussion_r1171250580 > Option to consume multiple partitions that have their data in remote storage > for the target offsets. > > > Key: KAFKA-14915 > URL: https://issues.apache.org/jira/browse/KAFKA-14915 > Project: Kafka > Issue Type: Sub-task >Reporter: Satish Duggana >Assignee: Kamal Chandraprakash >Priority: Major > > Context: https://github.com/apache/kafka/pull/13535#discussion_r1171250580 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeqo commented on a diff in pull request #13944: KAFKA-14953: Add tiered storage related metrics
jeqo commented on code in PR #13944: URL: https://github.com/apache/kafka/pull/13944#discussion_r1261114941 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -277,6 +277,12 @@ class BrokerTopicMetrics(name: Option[String]) { BrokerTopicStats.TotalFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"), BrokerTopicStats.FetchMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"), BrokerTopicStats.ProduceMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"), +BrokerTopicStats.RemoteCopyBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteCopyBytesPerSec, "bytes"), +BrokerTopicStats.RemoteFetchBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteFetchBytesPerSec, "bytes"), +BrokerTopicStats.RemoteReadRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteReadRequestsPerSec, "requests"), +BrokerTopicStats.RemoteWriteRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteWriteRequestsPerSec, "requests"), +BrokerTopicStats.FailedRemoteReadRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteReadRequestsPerSec, "requests"), +BrokerTopicStats.FailedRemoteWriteRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteWriteRequestsPerSec, "requests"), Review Comment: I was suspecting this would be the case. Though as metrics haven't been implemented yet, would it be possible to amend the KIP and ask for feedback? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.
[ https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742407#comment-17742407 ] Divij Vaidya commented on KAFKA-14912: -- [~ckamal] if you aren't actively working on this, can we have someone else in the community pick this up? > Introduce a configuration for remote index cache size, preferably a dynamic > config. > --- > > Key: KAFKA-14912 > URL: https://issues.apache.org/jira/browse/KAFKA-14912 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Assignee: Kamal Chandraprakash >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on a diff in pull request #13585: KAFKA-14737: Move kafka.utils.json to server-common
mimaison commented on code in PR #13585: URL: https://github.com/apache/kafka/pull/13585#discussion_r1261096752 ## server-common/src/main/java/org/apache/kafka/server/util/json/DecodeJson.java: ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.util.json; + +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public interface DecodeJson { +/** + * Decode the JSON node provided into an instance of `T`. + * + * @throws JsonMappingException if `node` cannot be decoded into `T`. + */ +T decode(JsonNode node) throws JsonMappingException; + +static JsonMappingException throwJsonMappingException(String expectedType, JsonNode node) { +return new JsonMappingException(null, String.format("Expected `%s` value, received %s", expectedType, node)); +} + +final class DecodeBoolean implements DecodeJson { +@Override +public Boolean decode(JsonNode node) throws JsonMappingException { +if (node.isBoolean()) { +return node.booleanValue(); +} +throw throwJsonMappingException(Boolean.class.getSimpleName(), node); +} +} + +final class DecodeDouble implements DecodeJson { +@Override +public Double decode(JsonNode node) throws JsonMappingException { +if (node.isDouble() || node.isLong() || node.isInt()) { +return node.doubleValue(); +} +throw throwJsonMappingException(Double.class.getSimpleName(), node); +} +} + +final class DecodeInteger implements DecodeJson { +@Override +public Integer decode(JsonNode node) throws JsonMappingException { +if (node.isInt()) { +return node.intValue(); +} +throw throwJsonMappingException(Integer.class.getSimpleName(), node); +} +} + +final class DecodeLong implements DecodeJson { +@Override +public Long decode(JsonNode node) throws JsonMappingException { +if (node.isLong() || node.isInt()) { +return node.longValue(); +} +throw throwJsonMappingException(Long.class.getSimpleName(), node); +} +} + +final class DecodeString implements DecodeJson { +@Override +public String decode(JsonNode node) throws JsonMappingException { +if (node.isTextual()) { +return node.textValue(); +} +throw throwJsonMappingException(String.class.getSimpleName(), node); +} +} + +static DecodeJson> decodeOptional(DecodeJson decodeJson) { +return node -> { +if (node.isNull()) return Optional.empty(); +return Optional.of(decodeJson.decode(node)); +}; +} + +static DecodeJson> decodeList(DecodeJson decodeJson) { +return node -> { +if (node.isArray()) { +List result = new ArrayList<>(); +Iterator elements = node.elements(); +while (elements.hasNext()) { +try { +result.add(decodeJson.decode(elements.next())); +} catch (JsonMappingException e) { +throw e; Review Comment: Do we need this `catch` is we rethrow the same exception directly? Same in `decodeMap()` below -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org