[GitHub] [kafka] Justinwins commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy
Justinwins commented on a change in pull request #10652: URL: https://github.com/apache/kafka/pull/10652#discussion_r684042834 ## File path: docs/upgrade.html ## @@ -83,7 +83,13 @@ Notable changes in 3 understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass in new ConsumerGroupMetadata(consumerGroupId) to work with older brokers. See https://cwiki.apache.org/confluence/x/zJONCg;>KIP-732 for more details. - + The Connect-based MirrorMaker (MM2) includes changes to support IdentityReplicationPolicy, enabling replication without renaming topics. +The existing DefaultReplicationPolicy is still used by default, but identity replication can be enabled via the +replication.policy configuration property. This is especially useful for users migrating from the older MirrorMaker (MM1), or for Review comment: i think it's more clear to say "replication.policy.class " here ,you know , which means it's configured in that form in mm2.properties file. Friendly to beginners . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13210) fetch/findSessions queries with open endpoints for SessionStore/WindowStore
Luke Chen created KAFKA-13210: - Summary: fetch/findSessions queries with open endpoints for SessionStore/WindowStore Key: KAFKA-13210 URL: https://issues.apache.org/jira/browse/KAFKA-13210 Project: Kafka Issue Type: Bug Components: streams Reporter: Luke Chen Assignee: Luke Chen This is the implementation of KIP-766: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186876596] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0
guozhangwang commented on pull request #11184: URL: https://github.com/apache/kafka/pull/11184#issuecomment-900710127 Cherry-picked to 3.0 to be included in the 3.0 docs cc @kkonstantine -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0
guozhangwang merged pull request #11184: URL: https://github.com/apache/kafka/pull/11184 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0
guozhangwang commented on a change in pull request #11184: URL: https://github.com/apache/kafka/pull/11184#discussion_r690792840 ## File path: docs/streams/upgrade-guide.html ## @@ -52,6 +52,15 @@ Upgrade Guide and API Changes restart all new ({{fullDotVersion}}) application instances + +Since 3.0.0 release, Kafka Streams uses a newer RocksDB version which bumped its footer version persisted on files. Review comment: Ack. ## File path: docs/streams/upgrade-guide.html ## @@ -52,6 +52,15 @@ Upgrade Guide and API Changes restart all new ({{fullDotVersion}}) application instances + +Since 3.0.0 release, Kafka Streams uses a newer RocksDB version which bumped its footer version persisted on files. +This means that old versioned RocksDB would not be able to recognize the bytes written by that newer versioned RocksDB, Review comment: Ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0
guozhangwang commented on a change in pull request #11184: URL: https://github.com/apache/kafka/pull/11184#discussion_r690792216 ## File path: docs/streams/upgrade-guide.html ## @@ -52,6 +52,15 @@ Upgrade Guide and API Changes restart all new ({{fullDotVersion}}) application instances + +Since 3.0.0 release, Kafka Streams uses a newer RocksDB version which bumped its footer version persisted on files. +This means that old versioned RocksDB would not be able to recognize the bytes written by that newer versioned RocksDB, +and hence it is harder to downgrade Kafka Streams with version 3.0.0 or newer to older versions in-flight. +Users need to wipe out the local RocksDB state stores written by the new versioned Kafka Streams before swapping in the +older versioned Kafka Streams bytecode, which when then restore the state stores with the old versioned footer from the Review comment: ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8295) Add merge() operator to State Stores.
[ https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400672#comment-17400672 ] Guozhang Wang commented on KAFKA-8295: -- Just to add some context here: we are having some thoughts about refactoring some state stores for windowed joins, where list-append types would be considered (cc [~vcrfxia] here). We can probably consider doing this together with the list-append types since it would be a major refactoring. > Add merge() operator to State Stores. > - > > Key: KAFKA-8295 > URL: https://issues.apache.org/jira/browse/KAFKA-8295 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Major > > In addition to regular put/get/delete RocksDB provides a fourth operation, > merge. This essentially provides an optimized read/update/write path in a > single operation. One of the built-in (C++) merge operators exposed over the > Java API is a counter. We should be able to leverage this for a more > efficient implementation of count() > > (Note: Unfortunately it seems unlikely we can use this to optimize general > aggregations, even if RocksJava allowed for a custom merge operator, unless > we provide a way for the user to specify and connect a C++ implemented > aggregator – otherwise we incur too much cost crossing the jni for a net > performance benefit) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13197) KStream-GlobalKTable join semantics don't match documentation
[ https://issues.apache.org/jira/browse/KAFKA-13197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400653#comment-17400653 ] Guozhang Wang commented on KAFKA-13197: --- Thanks for filing this [~twbecker]. The current doc says "If {@code keyValueMapper} returns {@code null} implying no match exists, no output record will be added to the resulting {@code KStream}." But I read the tickets and I think you are right: this property is not very reasonable since users may want to know if a single stream record does not find any matching results as well. I think the reasonable behavior (and the java doc should be updated accordingly) in KAFKA-10277 should be {code} If the keyValueMapper returns null implying no matching key found, the ValueJoiner would still be triggered with (null, v, null). {code} Does that sound right to you? > KStream-GlobalKTable join semantics don't match documentation > - > > Key: KAFKA-13197 > URL: https://issues.apache.org/jira/browse/KAFKA-13197 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0 >Reporter: Tommy Becker >Priority: Major > > As part of KAFKA-10277, the behavior of KStream-GlobalKTable joins was > changed. It appears the change was intended to merely relax a requirement but > it actually broke backwards compatibility. Although it does allow {{null}} > keys and values in the KStream to be joined, it now excludes {{null}} results > of the {{KeyValueMapper}}. We have an application which can return {{null}} > from the {{KeyValueMapper}} for non-null keys in the KStream, and relies on > these nulls being passed to the {{ValueJoiner}}. Indeed the javadoc still > explicitly says this is done: > {quote}If a KStream input record key or value is null the record will not be > included in the join operation and thus no output record will be added to the > resulting KStream. > If keyValueMapper returns null implying no match exists, a null value will > be provided to ValueJoiner. > {quote} > Both these statements are incorrect. > I think the new behavior is worse than the previous/documented behavior. It > feels more reasonable to have a non-null stream record map to a null join key > (our use-case is event-enhancement where the incoming record doesn't have the > join field), than the reverse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400651#comment-17400651 ] Guozhang Wang commented on KAFKA-13152: --- Yeah that's a good point. I think we should not pause partitions that have no data yet. Maybe we can modify 1) above as to "pause all partitions that have some data already". As for "fairness", I think this is either achieved or we've lost it at the consumer level, as we do round-robin fetching across all assigned partitions. And let's say if some partition A's message size is larger than partition B's message size, and assume their income record rate are the same, then partition B would have more records fetched than partition A on average. > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400647#comment-17400647 ] Matthias J. Sax commented on KAFKA-13152: - For (1), I think we need to take time-synchronization into account. If a task has multiple partitions, and some are empty, we might delay processing base on `task.max.idle.ms` config – however, we should always allow to fetch for the partitions of empty buffers and never pause them; otherwise, we would sacrifice ordered processing and I think a tradeoff between semantics and "buffer size" would not be a good one? We could even end up in a "temporal deadlock": no task is processable as it has at least one empty buffer, but all partitions are paused because we exceeded the max-buffer-space – the deadlock is temporal, because we would go into "forced processing" after `task.max.idle.ms` passed though – or we need to change the behavior and go into "forced processed" right away for this case without waiting for `max.task.idle.ms` (but it might be desirable to ignore `task.max.idle.ms`). Another question I have is about "balanced fetching": atm, we use the same buffer space for each partition and pause a single partition if its buffer space is exceeded. If we follow (1), could it happen that some partitions buffer much more data than others, and could this become a "fairness" problem? In the end, I agree that not have the exact same buffer space across all partitions can be beneficial: a high volume topic might be better off using more buffer space than a low volume topic. However, I am wonder if would still need some bounds to avoid that we go from the current extreme to give the exact same buffer space per partitions, to the opposite extreme for which some partitions might "starve" as their buffer space becomes too small? > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13203) GlobalTopic is added to main consumer subscription
[ https://issues.apache.org/jira/browse/KAFKA-13203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400645#comment-17400645 ] Guozhang Wang commented on KAFKA-13203: --- Which version of Kafka is this issue reported? I just checked the source code on trunk and from what I read this cannot happen. > GlobalTopic is added to main consumer subscription > -- > > Key: KAFKA-13203 > URL: https://issues.apache.org/jira/browse/KAFKA-13203 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > From [https://github.com/confluentinc/kafka-streams-examples/issues/351] > {quote}I have the following topology. > {{Topology topology = new Topology();}} > {{//WS connection processor}} > {{topology}} > {{ .addSource(SOURCE1, new StringDeserializer(), new > WebSocketConnectionEventDeserializer(), "topic1")}} > {{ .addProcessor(PROCESSOR1, Processor1::new, SOURCE1)}} > {{ .addStateStore(sessionStoreBuilder, PROCESSOR1)}} > {{ .addSink(WS_STATUS_SINK, "sinktopic", stringSerializer, stringSerializer, > PROCESSOR1)}} > {{}} > {{//Service discovery}} > {{ .addGlobalStore(nodeTopicDiscoveryStoreBuilder, SOURCE3, new > StringDeserializer(), > new ServiceDiscoveryEventDeserializer(), "compacttopic", > GLOBALSTOREPROCESSOR, > GlobalStateStoreBuilder::new)}} > {{}} > {{//WS session routing}} > {{ .addSource(SOURCE2, new StringDeserializer(), new StringDeserializer(), > "topic2")}} > {{ .addProcessor(PROCESSOR2, Processor2::new, > SOURCE2)}} > {{ .addStateStore(userConnectedNodesStoreBuilder, PROCESSOR2, PROCESSOR1);}} > {{}} > {{streams = new KafkaStreams(topology, kafkaStreamProperties); }} > While running the application, I get the following error > {{Consumer > clientId=ws-stream-processor-6115be26-b6e0-49d7-9c47-d9215bfcfea8-StreamThread-1-consumer, > groupId=ws-stream-processor] The following subscribed topics are not > assigned to any members: [compacttopic]}} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400641#comment-17400641 ] Guozhang Wang commented on KAFKA-13152: --- [~sagarrao] [~mjsax] (also cc [~desai.p.rohan]) I had a slightly different idea (probably a more complex one, just to admit) when filing the ticket. It is indeed a global config controlling the total number of bytes used for source partition record buffering, but it would not be distributed across all threads / tasks, instead we just monitor the aggregated total bytes across all tasks within the instance, when it has been reached, we can consider several options: 1) just pause all partitions; and then resume all partitions when it has dropped below the threshold. Not sure if it would result much "thrashing" on pausing / resuming, but since these actions are quite cheap anyways I'm not too worried about that. 2) pause some partitions, e.g. one heuristic is to pick the partition with most bytes; and then resume all paused partitions when it has dropped below the threshold. Personally I'm leaning towards 1) for now for simplicity, and we can consider if this is sufficient after observing its behavior in production later. > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13198) TopicsDelta doesn't update deleted topic when processing PartitionChangeRecord
[ https://issues.apache.org/jira/browse/KAFKA-13198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-13198. - Resolution: Fixed merged the PR to 3.0 and trunk. > TopicsDelta doesn't update deleted topic when processing PartitionChangeRecord > -- > > Key: KAFKA-13198 > URL: https://issues.apache.org/jira/browse/KAFKA-13198 > Project: Kafka > Issue Type: Bug > Components: kraft, replication >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Blocker > Labels: kip-500 > Fix For: 3.0.0 > > > In KRaft when a replica gets reassigned away from a topic partition we are > not notifying the {{ReplicaManager}} to stop the replica. > On solution is to track those topic partition ids when processing > {{PartitionChangeRecord}} and to returned them as {{deleted}} when the > replica manager calls {{calculateDeltaChanges}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao merged pull request #11216: KAFKA-13198: Stop replicas when reassigned
junrao merged pull request #11216: URL: https://github.com/apache/kafka/pull/11216 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #6233: KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER refresh login fails
rondagostino commented on pull request #6233: URL: https://github.com/apache/kafka/pull/6233#issuecomment-900565005 You can get the specified versions at https://kafka.apache.org/downloads. This fix appears in the release notes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11213: KAFKA-13201: Convert KTable aggregations to new PAPI
vvcephei commented on a change in pull request #11213: URL: https://github.com/apache/kafka/pull/11213#discussion_r690642563 ## File path: streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.MockProcessorContext; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.processor.internals.StreamTask; +import org.apache.kafka.streams.processor.internals.Task.TaskType; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.internals.ThreadCache; +import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener; + +import java.io.File; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +public class MockInternalNewProcessorContext extends MockProcessorContext implements InternalProcessorContext { Review comment: It looks like this is copied and modified from https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java . Is that right? We also have https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java , and it's been a longstanding thorn in our side that there's a proliferation of these context implementations. I'm hoping that by the time we're done with all these migrations, we can actually converge on this new class and delete the other two. ## File path: streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.MockProcessorContext; Review comment: This is the one defined in `test-utils`, right? Should we be using the new PAPI one (https://github.com/apache/kafka/blob/trunk/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java) instead? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java ## @@ -150,16 +153,16 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte } @Override -public void process(final K key, final Change value) { +public void process(final Record> record) { observedStreamTime = Math.max(observedStreamTime, internalProcessorContext.timestamp()); -buffer(key, value); +buffer(record);
[jira] [Resolved] (KAFKA-13207) Replica fetcher should not update partition state on diverging epoch if partition removed from fetcher
[ https://issues.apache.org/jira/browse/KAFKA-13207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-13207. Resolution: Fixed > Replica fetcher should not update partition state on diverging epoch if > partition removed from fetcher > -- > > Key: KAFKA-13207 > URL: https://issues.apache.org/jira/browse/KAFKA-13207 > Project: Kafka > Issue Type: New Feature > Components: core >Affects Versions: 2.8.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 3.0.0, 2.8.1 > > > {{AbstractFetcherThread#truncateOnFetchResponse}}{color:#24292e} is used with > IBP 2.7 and above to truncate partitions based on diverging epoch returned in > fetch responses. Truncation should only be performed for partitions that are > still owned by the fetcher and this check should be done while holding > {color}{{partitionMapLock}}{color:#24292e} to ensure that any partitions > removed from the fetcher thread are not truncated{color} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram commented on pull request #11221: KAFKA-13207: Skip truncation on fetch response with diverging epoch if partition removed from fetcher
rajinisivaram commented on pull request #11221: URL: https://github.com/apache/kafka/pull/11221#issuecomment-900554192 @dajac Yes, merged to 3.0 and 2.8 as well, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13209) Upgrade jetty-server to fix CVE-2021-34429
[ https://issues.apache.org/jira/browse/KAFKA-13209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-13209. Resolution: Fixed > Upgrade jetty-server to fix CVE-2021-34429 > -- > > Key: KAFKA-13209 > URL: https://issues.apache.org/jira/browse/KAFKA-13209 > Project: Kafka > Issue Type: Improvement >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > Fix For: 3.0.0, 2.7.2, 2.8.1 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13209) Upgrade jetty-server to fix CVE-2021-34429
[ https://issues.apache.org/jira/browse/KAFKA-13209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-13209: --- Fix Version/s: (was: 2.7.1) 2.7.2 > Upgrade jetty-server to fix CVE-2021-34429 > -- > > Key: KAFKA-13209 > URL: https://issues.apache.org/jira/browse/KAFKA-13209 > Project: Kafka > Issue Type: Improvement >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > Fix For: 3.0.0, 2.7.2, 2.8.1 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13209) Upgrade jetty-server to fix CVE-2021-34429
[ https://issues.apache.org/jira/browse/KAFKA-13209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-13209: --- Fix Version/s: 2.7.1 2.8.1 3.0.0 > Upgrade jetty-server to fix CVE-2021-34429 > -- > > Key: KAFKA-13209 > URL: https://issues.apache.org/jira/browse/KAFKA-13209 > Project: Kafka > Issue Type: Improvement >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > Fix For: 3.0.0, 2.7.1, 2.8.1 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13209) Upgrade jetty-server to fix CVE-2021-34429
[ https://issues.apache.org/jira/browse/KAFKA-13209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reassigned KAFKA-13209: -- Assignee: Justine Olshan > Upgrade jetty-server to fix CVE-2021-34429 > -- > > Key: KAFKA-13209 > URL: https://issues.apache.org/jira/browse/KAFKA-13209 > Project: Kafka > Issue Type: Improvement >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on pull request #11221: KAFKA-13207: Skip truncation on fetch response with diverging epoch if partition removed from fetcher
dajac commented on pull request #11221: URL: https://github.com/apache/kafka/pull/11221#issuecomment-900541736 @rajinisivaram Should we also add it to 2.8 branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
jsancio commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r690620725 ## File path: core/src/main/scala/kafka/server/BrokerServer.scala ## @@ -113,7 +113,7 @@ class BrokerServer( var dynamicConfigHandlers: Map[String, ConfigHandler] = null - var replicaManager: ReplicaManager = null + @volatile private[this] var _replicaManager: ReplicaManager = null Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy merged pull request #11224: KAFKA-13209: Upgrade jetty-server to fix CVE-2021-34429
omkreddy merged pull request #11224: URL: https://github.com/apache/kafka/pull/11224 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] venkatesh010 commented on pull request #6233: KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER refresh login fails
venkatesh010 commented on pull request #6233: URL: https://github.com/apache/kafka/pull/6233#issuecomment-900514731 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #6233: KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER refresh login fails
rondagostino commented on pull request #6233: URL: https://github.com/apache/kafka/pull/6233#issuecomment-900457425 .. and yes, restarting the client will get the client out of the bad state if you happen to be using a version of the code with this bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #6233: KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER refresh login fails
rondagostino commented on pull request #6233: URL: https://github.com/apache/kafka/pull/6233#issuecomment-900456795 @venkatesh010 https://issues.apache.org/jira/browse/KAFKA-7902 says `Fix Version/s: 2.2.0, 2.1.1` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request #11224: KAFKA-13209: Upgrade jetty-server to fix CVE-2021-34429
jolshan opened a new pull request #11224: URL: https://github.com/apache/kafka/pull/11224 Upgrading to 9.4.43.v20210629 Release notes: https://github.com/eclipse/jetty.project/releases/tag/jetty-9.4.43.v20210629 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram merged pull request #11221: KAFKA-13207: Skip truncation on fetch response with diverging epoch if partition removed from fetcher
rajinisivaram merged pull request #11221: URL: https://github.com/apache/kafka/pull/11221 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #11221: KAFKA-13207: Skip truncation on fetch response with diverging epoch if partition removed from fetcher
rajinisivaram commented on pull request #11221: URL: https://github.com/apache/kafka/pull/11221#issuecomment-900433010 @dajac @hachikuji Thanks for the reviews, merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13209) Upgrade jetty-server to fix CVE-2021-34429
[ https://issues.apache.org/jira/browse/KAFKA-13209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-13209: --- Summary: Upgrade jetty-server to fix CVE-2021-34429 (was: Upgrade jetty-server to 9.4.43.v20210629) > Upgrade jetty-server to fix CVE-2021-34429 > -- > > Key: KAFKA-13209 > URL: https://issues.apache.org/jira/browse/KAFKA-13209 > Project: Kafka > Issue Type: Improvement >Reporter: Justine Olshan >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13209) Upgrade jetty-server to 9.4.43.v20210629
Justine Olshan created KAFKA-13209: -- Summary: Upgrade jetty-server to 9.4.43.v20210629 Key: KAFKA-13209 URL: https://issues.apache.org/jira/browse/KAFKA-13209 Project: Kafka Issue Type: Improvement Reporter: Justine Olshan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
junrao commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r690492221 ## File path: core/src/main/scala/kafka/server/BrokerServer.scala ## @@ -113,7 +113,7 @@ class BrokerServer( var dynamicConfigHandlers: Map[String, ConfigHandler] = null - var replicaManager: ReplicaManager = null + @volatile private[this] var _replicaManager: ReplicaManager = null Review comment: Could you make the same change for KafkaServer too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
jsancio commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r690455391 ## File path: core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala ## @@ -410,6 +421,32 @@ class KRaftClusterTest { } } + private def checkReplicaManager(cluster: KafkaClusterTestKit, expectedHosting: List[(Int, List[Boolean])]): Unit = { +for ((brokerId, partitionsIsHosted) <- expectedHosting) { + val broker = cluster.brokers().get(brokerId) + // lock and unlock so we can read the replica manager Review comment: I cleaned this up by just making `var replicaManager` volatile. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
jsancio commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r690442367 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -2074,48 +2073,23 @@ class ReplicaManager(val config: KafkaConfig, } } - private[kafka] def calculateDeltaChanges(delta: TopicsDelta) -: (mutable.HashMap[TopicPartition, Boolean], - mutable.HashMap[TopicPartition, LocalLeaderInfo], - mutable.HashMap[TopicPartition, LocalLeaderInfo]) = { -val deleted = new mutable.HashMap[TopicPartition, Boolean]() -delta.deletedTopicIds().forEach { topicId => - val topicImage = delta.image().getTopic(topicId) - topicImage.partitions().keySet().forEach { partitionId => -deleted.put(new TopicPartition(topicImage.name(), partitionId), true) - } -} -val newLocalLeaders = new mutable.HashMap[TopicPartition, LocalLeaderInfo]() -val newLocalFollowers = new mutable.HashMap[TopicPartition, LocalLeaderInfo]() -delta.changedTopics().values().forEach { topicDelta => - topicDelta.newLocalLeaders(config.nodeId).forEach { e => -newLocalLeaders.put(new TopicPartition(topicDelta.name(), e.getKey), - LocalLeaderInfo(topicDelta.id(), e.getValue)) - } - topicDelta.newLocalFollowers(config.nodeId).forEach { e => -newLocalFollowers.put(new TopicPartition(topicDelta.name(), e.getKey), - LocalLeaderInfo(topicDelta.id(), e.getValue)) - } -} -(deleted, newLocalLeaders, newLocalFollowers) - } - /** * Apply a KRaft topic change delta. * * @param newImageThe new metadata image. * @param delta The delta to apply. */ def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = { -// Before taking the lock, build some hash maps that we will need. -val (deleted, newLocalLeaders, newLocalFollowers) = calculateDeltaChanges(delta) +// Before taking the lock, compute the local changes +val localChanges = delta.localChanges(config.nodeId) replicaStateChangeLock.synchronized { // Handle deleted partitions. We need to do this first because we might subsequently // create new partitions with the same names as the ones we are deleting here. - if (!deleted.isEmpty) { -stateChangeLogger.info(s"Deleting ${deleted.size} partition(s).") -stopPartitions(deleted).foreach { case (topicPartition, e) => + if (!localChanges.deletes.isEmpty) { Review comment: @junrao, I added a test for this and split up the existing test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11221: KAFKA-13207: Skip truncation on fetch response with diverging epoch if partition removed from fetcher
dajac commented on a change in pull request #11221: URL: https://github.com/apache/kafka/pull/11221#discussion_r690383569 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -262,22 +263,29 @@ abstract class AbstractFetcherThread(name: String, val partitionsWithError = mutable.HashSet.empty[TopicPartition] fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) => - Errors.forCode(leaderEpochOffset.errorCode) match { -case Errors.NONE => - val offsetTruncationState = getOffsetTruncationState(tp, leaderEpochOffset) - info(s"Truncating partition $tp with $offsetTruncationState due to leader epoch and offset $leaderEpochOffset") - if (doTruncate(tp, offsetTruncationState)) -fetchOffsets.put(tp, offsetTruncationState) - -case Errors.FENCED_LEADER_EPOCH => - val currentLeaderEpoch = latestEpochsForPartitions.get(tp) -.map(epochEndOffset => Int.box(epochEndOffset.currentLeaderEpoch)).asJava - if (onPartitionFenced(tp, currentLeaderEpoch)) + if (partitionStates.contains(tp)) { +Errors.forCode(leaderEpochOffset.errorCode) match { + case Errors.NONE => +val offsetTruncationState = getOffsetTruncationState(tp, leaderEpochOffset) +info(s"Truncating partition $tp with $offsetTruncationState due to leader epoch and offset $leaderEpochOffset") +if (doTruncate(tp, offsetTruncationState)) + fetchOffsets.put(tp, offsetTruncationState) + + case Errors.FENCED_LEADER_EPOCH => +val currentLeaderEpoch = latestEpochsForPartitions.get(tp) + .map(epochEndOffset => Int.box(epochEndOffset.currentLeaderEpoch)).asJava +if (onPartitionFenced(tp, currentLeaderEpoch)) + partitionsWithError += tp + + case error => +info(s"Retrying leaderEpoch request for partition $tp as the leader reported an error: $error") partitionsWithError += tp - -case error => - info(s"Retrying leaderEpoch request for partition $tp as the leader reported an error: $error") - partitionsWithError += tp +} + } else { +// Partitions may have been removed from the fetcher while the thread was waiting for fetch +// response. Removed partitions are filtered out while holding `partitionMapLock` to ensure that we +// don't update state for any partition that may have already been migrated to another thread. +trace(s"Ignoring epoch offsets for partition '$tp' since it has been removed from this fetcher thread.") Review comment: nit: I would remove the simple quotes around `tp` to remain consistent with the other logs above (or the other way around). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12421) Improve controller's atomic grouping
[ https://issues.apache.org/jira/browse/KAFKA-12421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao reassigned KAFKA-12421: --- Assignee: HaiyuanZhao > Improve controller's atomic grouping > > > Key: KAFKA-12421 > URL: https://issues.apache.org/jira/browse/KAFKA-12421 > Project: Kafka > Issue Type: Improvement > Components: controller >Reporter: Jose Armando Garcia Sancio >Assignee: HaiyuanZhao >Priority: Major > Labels: kip-500 > > The current controller implementation atomically appends to the metadata log > by making sure that all required records are on the same batch. The > controller groups all of the records that result from an RPC into one batch. > Some of the RPCs are: > # Client quota changes > # Configuration changes > # Feature changes > # Topic creation > This is good enough for correctness but it is more aggressive than necessary. > For example, for topic creation since errors are reported independently, the > controller only needs to guarantee that all of the records for one topic are > committed atomically. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13196) MirrorMaker 2 not always start tasks
[ https://issues.apache.org/jira/browse/KAFKA-13196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jozef Vilcek updated KAFKA-13196: - Description: I am using MirrorMaker 2.0 and running it via [ MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java] class. This method will start up `DistributedHerder` and will use non-functional `advertisedUrl`, and therefore workers can not talk to each other and coordinate. After upgrading from version `2.4.0` to `2.7.1` I noticed that when I am starting up mirror maker it does not always start tasks - just connector is running. Doing some amount of stop/starts will eventually start tasks too. After a bit of digging I did notice that in attempt to configure connector's task, code ends up in this [1] branch, where configure request is being forwarded to the leader. For some reason, task configuration is not done on leader. However, MirrorMaker does not pack RestServer and therefore that request will never succeed. I am not sure what is going no or why it does seem to work better on 2.4.0. I noticed that connector start procedure did involve less callbacks on 2.4.0 in connector start sequence [2]. [1] [https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494] [2] [https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236] was: I am using MirrorMaker 2.0 and running it via [ MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java] class. This method will start up `DistributedHerder` and will use non-functional `advertisedUrl`, and therefore workers can not talk to each other and coordinate. After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am starting up mirror maker ti does not always start tasks - just connector is executing. Doing some amount of stop and start it will eventually start tasks too. After a bit of digging I did noticed that in attempt to configure connector's task, code ends up in this [1] branch, where configure request is being forwarded to the leader. From some reason, task configuration is not done on leader. However, MirrorMaker does not pack RestServer and therefore that request will never succeed. I am not sure what is going no or why it does seem to work better on 2.4.0. I noticed that connector start procedure did involve less callbacks on 2.4.0 in connector start sequence [2]. [1] [https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494] [2] [https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236] > MirrorMaker 2 not always start tasks > > > Key: KAFKA-13196 > URL: https://issues.apache.org/jira/browse/KAFKA-13196 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.1 >Reporter: Jozef Vilcek >Priority: Major > > I am using MirrorMaker 2.0 and running it via [ > MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java] > class. This method will start up `DistributedHerder` and will use > non-functional `advertisedUrl`, and therefore workers can not talk to each > other and coordinate. > After upgrading from version `2.4.0` to `2.7.1` I noticed that when I am > starting up mirror maker it does not always start tasks - just connector is > running. Doing some amount of stop/starts will eventually start tasks too. > After a bit of digging I did notice that in attempt to configure connector's > task, code ends up in this [1] branch, where configure request is being > forwarded to the leader. For some reason, task configuration is not done on > leader. However, MirrorMaker does not pack RestServer and therefore that > request will never succeed. > I am not sure what is going no or why it does seem to work better on 2.4.0. I > noticed that connector start procedure did involve less callbacks on 2.4.0 in > connector start sequence [2]. > > > [1] > [https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494] > [2] > [https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram commented on a change in pull request #11221: KAFKA-13207: Don't partition state on fetch response with diverging epoch if partition removed from fetcher
rajinisivaram commented on a change in pull request #11221: URL: https://github.com/apache/kafka/pull/11221#discussion_r690150564 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -268,7 +262,10 @@ abstract class AbstractFetcherThread(name: String, val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState] val partitionsWithError = mutable.HashSet.empty[TopicPartition] -fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) => +// Partitions may have been removed from the fetcher while the thread was waiting for fetch +// response. Filter out removed partitions while holding `partitionMapLock` to ensure that we +// don't update state for any partition that may have already been migrated to another thread. +fetchedEpochs.filter { case (tp, _) => partitionStates.contains(tp) }.forKeyValue { (tp, leaderEpochOffset) => Review comment: @hachikuji Thanks for the review, updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] venkatesh010 commented on pull request #6233: KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER refresh login fails
venkatesh010 commented on pull request #6233: URL: https://github.com/apache/kafka/pull/6233#issuecomment-900070176 And is restarting service a temporary solution for it? As loginContext will get created again post restart of client -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] venkatesh010 commented on pull request #6233: KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER refresh login fails
venkatesh010 commented on pull request #6233: URL: https://github.com/apache/kafka/pull/6233#issuecomment-900069794 @rondagostino @rajinisivaram which version of apache Kafka client or if I'm using spring cloud stream, which version of spring-kafka should I use..to incorporate this fix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13032) Impossible stacktrace
[ https://issues.apache.org/jira/browse/KAFKA-13032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanwen Lin reassigned KAFKA-13032: -- Assignee: Yanwen Lin > Impossible stacktrace > - > > Key: KAFKA-13032 > URL: https://issues.apache.org/jira/browse/KAFKA-13032 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.0 >Reporter: Niclas Hedhman >Assignee: Yanwen Lin >Priority: Minor > Labels: beginner, easy-fix > > I am presented with a stacktrace that has not a single touch point in my > code, so it is incredibly difficult to figure out where the problem could be. > I think more RuntimeExceptions need to be caught and pull out information at > each level that is providing any additional hint of where we are. > For instance, each node could prepend its reference/name and one would have a > chance to see where we are... > ``` > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_140, processor=KSTREAM-SOURCE-00, topic=_poll, > partition=140, offset=0, stacktrace=java.lang.NullPointerException > at > org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:268) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214) > at > org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:50) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at >
[GitHub] [kafka] edipesh19 closed pull request #11223: Upgrade zookeeper version to 3.6.3
edipesh19 closed pull request #11223: URL: https://github.com/apache/kafka/pull/11223 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] edipesh19 opened a new pull request #11223: Upgrade zookeeper version to 3.6.3
edipesh19 opened a new pull request #11223: URL: https://github.com/apache/kafka/pull/11223 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org