[GitHub] [kafka] Justinwins commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy

2021-08-17 Thread GitBox


Justinwins commented on a change in pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#discussion_r684042834



##
File path: docs/upgrade.html
##
@@ -83,7 +83,13 @@ Notable changes in 3
 understood by brokers or version 2.5 or higher, so you must upgrade 
your kafka cluster to get the stronger semantics. Otherwise, you can just pass
 in new ConsumerGroupMetadata(consumerGroupId) to work 
with older brokers. See https://cwiki.apache.org/confluence/x/zJONCg;>KIP-732 for more 
details.
 
-
+ The Connect-based MirrorMaker (MM2) includes changes to support 
IdentityReplicationPolicy, enabling replication without renaming 
topics.
+The existing DefaultReplicationPolicy is still used by 
default, but identity replication can be enabled via the
+replication.policy configuration property. This is 
especially useful for users migrating from the older MirrorMaker (MM1), or for

Review comment:
   i think it's more clear to  say  "replication.policy.class "  here ,you 
know ,  which means it's configured in that form in mm2.properties  file.
   
   Friendly to beginners .
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13210) fetch/findSessions queries with open endpoints for SessionStore/WindowStore

2021-08-17 Thread Luke Chen (Jira)
Luke Chen created KAFKA-13210:
-

 Summary: fetch/findSessions queries with open endpoints for 
SessionStore/WindowStore
 Key: KAFKA-13210
 URL: https://issues.apache.org/jira/browse/KAFKA-13210
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Luke Chen
Assignee: Luke Chen


This is the implementation of KIP-766: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186876596]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0

2021-08-17 Thread GitBox


guozhangwang commented on pull request #11184:
URL: https://github.com/apache/kafka/pull/11184#issuecomment-900710127


   Cherry-picked to 3.0 to be included in the 3.0 docs cc @kkonstantine 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0

2021-08-17 Thread GitBox


guozhangwang merged pull request #11184:
URL: https://github.com/apache/kafka/pull/11184


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0

2021-08-17 Thread GitBox


guozhangwang commented on a change in pull request #11184:
URL: https://github.com/apache/kafka/pull/11184#discussion_r690792840



##
File path: docs/streams/upgrade-guide.html
##
@@ -52,6 +52,15 @@ Upgrade Guide and API Changes
  restart all new ({{fullDotVersion}}) application instances 
 
 
+
+Since 3.0.0 release, Kafka Streams uses a newer RocksDB version which 
bumped its footer version persisted on files.

Review comment:
   Ack.

##
File path: docs/streams/upgrade-guide.html
##
@@ -52,6 +52,15 @@ Upgrade Guide and API Changes
  restart all new ({{fullDotVersion}}) application instances 
 
 
+
+Since 3.0.0 release, Kafka Streams uses a newer RocksDB version which 
bumped its footer version persisted on files.
+This means that old versioned RocksDB would not be able to recognize 
the bytes written by that newer versioned RocksDB,

Review comment:
   Ack.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0

2021-08-17 Thread GitBox


guozhangwang commented on a change in pull request #11184:
URL: https://github.com/apache/kafka/pull/11184#discussion_r690792216



##
File path: docs/streams/upgrade-guide.html
##
@@ -52,6 +52,15 @@ Upgrade Guide and API Changes
  restart all new ({{fullDotVersion}}) application instances 
 
 
+
+Since 3.0.0 release, Kafka Streams uses a newer RocksDB version which 
bumped its footer version persisted on files.
+This means that old versioned RocksDB would not be able to recognize 
the bytes written by that newer versioned RocksDB,
+and hence it is harder to downgrade Kafka Streams with version 3.0.0 
or newer to older versions in-flight.
+Users need to wipe out the local RocksDB state stores written by the 
new versioned Kafka Streams before swapping in the
+older versioned Kafka Streams bytecode, which when then restore the 
state stores with the old versioned footer from the

Review comment:
   ack.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8295) Add merge() operator to State Stores.

2021-08-17 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400672#comment-17400672
 ] 

Guozhang Wang commented on KAFKA-8295:
--

Just to add some context here: we are having some thoughts about refactoring 
some state stores for windowed joins, where list-append types would be 
considered (cc [~vcrfxia] here). We can probably consider doing this together 
with the list-append types since it would be a major refactoring.

> Add merge() operator to State Stores.
> -
>
> Key: KAFKA-8295
> URL: https://issues.apache.org/jira/browse/KAFKA-8295
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Major
>
> In addition to regular put/get/delete RocksDB provides a fourth operation, 
> merge. This essentially provides an optimized read/update/write path in a 
> single operation. One of the built-in (C++) merge operators exposed over the 
> Java API is a counter. We should be able to leverage this for a more 
> efficient implementation of count()
>  
> (Note: Unfortunately it seems unlikely we can use this to optimize general 
> aggregations, even if RocksJava allowed for a custom merge operator, unless 
> we provide a way for the user to specify and connect a C++ implemented 
> aggregator – otherwise we incur too much cost crossing the jni for a net 
> performance benefit)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13197) KStream-GlobalKTable join semantics don't match documentation

2021-08-17 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400653#comment-17400653
 ] 

Guozhang Wang commented on KAFKA-13197:
---

Thanks for filing this [~twbecker]. The current doc says "If {@code 
keyValueMapper} returns {@code null} implying no match exists, no output record 
will be added to the resulting {@code KStream}." But I read the tickets and I 
think you are right: this property is not very reasonable since users may want 
to know if a single stream record does not find any matching results as well.

I think the reasonable behavior (and the java doc should be updated 
accordingly) in KAFKA-10277 should be

{code}
If the keyValueMapper returns null implying no matching key found, the 
ValueJoiner would still be triggered with (null, v, null).
{code}

Does that sound right to you?

> KStream-GlobalKTable join semantics don't match documentation
> -
>
> Key: KAFKA-13197
> URL: https://issues.apache.org/jira/browse/KAFKA-13197
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0
>Reporter: Tommy Becker
>Priority: Major
>
> As part of KAFKA-10277, the behavior of KStream-GlobalKTable joins was 
> changed. It appears the change was intended to merely relax a requirement but 
> it actually broke backwards compatibility. Although it does allow {{null}} 
> keys and values in the KStream to be joined, it now excludes {{null}} results 
> of the {{KeyValueMapper}}. We have an application which can return {{null}} 
> from the {{KeyValueMapper}} for non-null keys in the KStream, and relies on 
> these nulls being passed to the {{ValueJoiner}}. Indeed the javadoc still 
> explicitly says this is done:
> {quote}If a KStream input record key or value is null the record will not be 
> included in the join operation and thus no output record will be added to the 
> resulting KStream.
>  If keyValueMapper returns null implying no match exists, a null value will 
> be provided to ValueJoiner.
> {quote}
> Both these statements are incorrect.
> I think the new behavior is worse than the previous/documented behavior. It 
> feels more reasonable to have a non-null stream record map to a null join key 
> (our use-case is event-enhancement where the incoming record doesn't have the 
> join field), than the reverse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-08-17 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400651#comment-17400651
 ] 

Guozhang Wang commented on KAFKA-13152:
---

Yeah that's a good point. I think we should not pause partitions that have no 
data yet. Maybe we can modify 1) above as to "pause all partitions that have 
some data already".

As for "fairness", I think this is either achieved or we've lost it at the 
consumer level, as we do round-robin fetching across all assigned partitions. 
And let's say if some partition A's message size is larger than partition B's 
message size, and assume their income record rate are the same, then partition 
B would have more records fetched than partition A on average.

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-08-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400647#comment-17400647
 ] 

Matthias J. Sax commented on KAFKA-13152:
-

For (1), I think we need to take time-synchronization into account. If a task 
has multiple partitions, and some are empty, we might delay processing base on 
`task.max.idle.ms` config – however, we should always allow to fetch for the 
partitions of empty buffers and never pause them; otherwise, we would sacrifice 
ordered processing and I think a tradeoff between semantics and "buffer size" 
would not be a good one? We could even end up in a "temporal deadlock": no task 
is processable as it has at least one empty buffer, but all partitions are 
paused because we exceeded the max-buffer-space – the deadlock is temporal, 
because we would go into "forced processing" after `task.max.idle.ms` passed 
though – or we need to change the behavior and go into "forced processed" right 
away for this case without waiting for `max.task.idle.ms` (but it might be 
desirable to ignore `task.max.idle.ms`).

Another question I have is about "balanced fetching": atm, we use the same 
buffer space for each partition and pause a single partition if its buffer 
space is exceeded. If we follow (1), could it happen that some partitions 
buffer much more data than others, and could this become a "fairness" problem? 
In the end, I agree that not have the exact same buffer space across all 
partitions can be beneficial: a high volume topic might be better off using 
more buffer space than a low volume topic. However, I am wonder if would still 
need some bounds to avoid that we go from the current extreme to give the exact 
same buffer space per partitions, to the opposite extreme for which some 
partitions might "starve" as their buffer space becomes too small?

 

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13203) GlobalTopic is added to main consumer subscription

2021-08-17 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400645#comment-17400645
 ] 

Guozhang Wang commented on KAFKA-13203:
---

Which version of Kafka is this issue reported? I just checked the source code 
on trunk and from what I read this cannot happen.

> GlobalTopic is added to main consumer subscription
> --
>
> Key: KAFKA-13203
> URL: https://issues.apache.org/jira/browse/KAFKA-13203
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> From [https://github.com/confluentinc/kafka-streams-examples/issues/351]
> {quote}I have the following topology.
>  {{Topology topology = new Topology();}}
> {{//WS connection processor}}
> {{topology}}
> {{  .addSource(SOURCE1, new StringDeserializer(), new 
> WebSocketConnectionEventDeserializer(), "topic1")}}
> {{  .addProcessor(PROCESSOR1, Processor1::new, SOURCE1)}}
> {{  .addStateStore(sessionStoreBuilder, PROCESSOR1)}}
> {{  .addSink(WS_STATUS_SINK, "sinktopic", stringSerializer, stringSerializer, 
> PROCESSOR1)}}
> {{}}
> {{//Service discovery}}
> {{  .addGlobalStore(nodeTopicDiscoveryStoreBuilder, SOURCE3, new 
> StringDeserializer(),
> new ServiceDiscoveryEventDeserializer(), "compacttopic", 
> GLOBALSTOREPROCESSOR,
> GlobalStateStoreBuilder::new)}}
> {{}}
> {{//WS session routing}}
> {{  .addSource(SOURCE2, new StringDeserializer(), new StringDeserializer(),
> "topic2")}}
> {{  .addProcessor(PROCESSOR2, Processor2::new,
> SOURCE2)}}
> {{  .addStateStore(userConnectedNodesStoreBuilder, PROCESSOR2, PROCESSOR1);}}
> {{}}
> {{streams = new KafkaStreams(topology, kafkaStreamProperties);  }}
> While running the application, I get the following error
> {{Consumer 
> clientId=ws-stream-processor-6115be26-b6e0-49d7-9c47-d9215bfcfea8-StreamThread-1-consumer,
>  groupId=ws-stream-processor] The following subscribed topics are not 
> assigned to any members: [compacttopic]}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-08-17 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400641#comment-17400641
 ] 

Guozhang Wang commented on KAFKA-13152:
---

[~sagarrao] [~mjsax] (also cc [~desai.p.rohan]) I had a slightly different idea 
(probably a more complex one, just to admit) when filing the ticket.

It is indeed a global config controlling the total number of bytes used for 
source partition record buffering, but it would not be distributed across all 
threads / tasks, instead we just monitor the aggregated total bytes across all 
tasks within the instance, when it has been reached, we can consider several 
options:

1) just pause all partitions; and then resume all partitions when it has 
dropped below the threshold. Not sure if it would result much "thrashing" on 
pausing / resuming, but since these actions are quite cheap anyways I'm not too 
worried about that.
2) pause some partitions, e.g. one heuristic is to pick the partition with most 
bytes; and then resume all paused partitions when it has dropped below the 
threshold.

Personally I'm leaning towards 1) for now for simplicity, and we can consider 
if this is sufficient after observing its behavior in production later.

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13198) TopicsDelta doesn't update deleted topic when processing PartitionChangeRecord

2021-08-17 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13198.
-
Resolution: Fixed

merged the PR to 3.0 and trunk.

> TopicsDelta doesn't update deleted topic when processing PartitionChangeRecord
> --
>
> Key: KAFKA-13198
> URL: https://issues.apache.org/jira/browse/KAFKA-13198
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Blocker
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> In KRaft when a replica gets reassigned away from a topic partition we are 
> not notifying the {{ReplicaManager}} to stop the replica.
> On solution is to track those topic partition ids when processing 
> {{PartitionChangeRecord}} and to returned them as {{deleted}} when the 
> replica manager calls {{calculateDeltaChanges}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao merged pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-17 Thread GitBox


junrao merged pull request #11216:
URL: https://github.com/apache/kafka/pull/11216


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #6233: KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER refresh login fails

2021-08-17 Thread GitBox


rondagostino commented on pull request #6233:
URL: https://github.com/apache/kafka/pull/6233#issuecomment-900565005


   You can get the specified versions at https://kafka.apache.org/downloads.  
This fix appears in the release notes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11213: KAFKA-13201: Convert KTable aggregations to new PAPI

2021-08-17 Thread GitBox


vvcephei commented on a change in pull request #11213:
URL: https://github.com/apache/kafka/pull/11213#discussion_r690642563



##
File path: 
streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java
##
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+import 
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
+
+import java.io.File;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+public class MockInternalNewProcessorContext extends 
MockProcessorContext implements InternalProcessorContext {

Review comment:
   It looks like this is copied and modified from 
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
 . Is that right?
   
   We also have 
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
 , and it's been a longstanding thorn in our side that there's a proliferation 
of these context implementations.
   
   I'm hoping that by the time we're done with all these migrations, we can 
actually converge on this new class and delete the other two.

##
File path: 
streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java
##
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.MockProcessorContext;

Review comment:
   This is the one defined in `test-utils`, right? Should we be using the 
new PAPI one 
(https://github.com/apache/kafka/blob/trunk/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java)
 instead?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
##
@@ -150,16 +153,16 @@ public void init(final 
org.apache.kafka.streams.processor.ProcessorContext conte
 }
 
 @Override
-public void process(final K key, final Change value) {
+public void process(final Record> record) {
 observedStreamTime = Math.max(observedStreamTime, 
internalProcessorContext.timestamp());
-buffer(key, value);
+buffer(record);

[jira] [Resolved] (KAFKA-13207) Replica fetcher should not update partition state on diverging epoch if partition removed from fetcher

2021-08-17 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-13207.

Resolution: Fixed

> Replica fetcher should not update partition state on diverging epoch if 
> partition removed from fetcher
> --
>
> Key: KAFKA-13207
> URL: https://issues.apache.org/jira/browse/KAFKA-13207
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 3.0.0, 2.8.1
>
>
> {{AbstractFetcherThread#truncateOnFetchResponse}}{color:#24292e} is used with 
> IBP 2.7 and above to truncate partitions based on diverging epoch returned in 
> fetch responses. Truncation should only be performed for partitions that are 
> still owned by the fetcher and this check should be done while holding 
> {color}{{partitionMapLock}}{color:#24292e} to ensure that any partitions 
> removed from the fetcher thread are not truncated{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on pull request #11221: KAFKA-13207: Skip truncation on fetch response with diverging epoch if partition removed from fetcher

2021-08-17 Thread GitBox


rajinisivaram commented on pull request #11221:
URL: https://github.com/apache/kafka/pull/11221#issuecomment-900554192


   @dajac Yes, merged to 3.0 and 2.8 as well, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13209) Upgrade jetty-server to fix CVE-2021-34429

2021-08-17 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-13209.

Resolution: Fixed

> Upgrade jetty-server to fix CVE-2021-34429
> --
>
> Key: KAFKA-13209
> URL: https://issues.apache.org/jira/browse/KAFKA-13209
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13209) Upgrade jetty-server to fix CVE-2021-34429

2021-08-17 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-13209:
---
Fix Version/s: (was: 2.7.1)
   2.7.2

> Upgrade jetty-server to fix CVE-2021-34429
> --
>
> Key: KAFKA-13209
> URL: https://issues.apache.org/jira/browse/KAFKA-13209
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13209) Upgrade jetty-server to fix CVE-2021-34429

2021-08-17 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-13209:
---
Fix Version/s: 2.7.1
   2.8.1
   3.0.0

> Upgrade jetty-server to fix CVE-2021-34429
> --
>
> Key: KAFKA-13209
> URL: https://issues.apache.org/jira/browse/KAFKA-13209
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.0.0, 2.7.1, 2.8.1
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13209) Upgrade jetty-server to fix CVE-2021-34429

2021-08-17 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-13209:
--

Assignee: Justine Olshan

> Upgrade jetty-server to fix CVE-2021-34429
> --
>
> Key: KAFKA-13209
> URL: https://issues.apache.org/jira/browse/KAFKA-13209
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #11221: KAFKA-13207: Skip truncation on fetch response with diverging epoch if partition removed from fetcher

2021-08-17 Thread GitBox


dajac commented on pull request #11221:
URL: https://github.com/apache/kafka/pull/11221#issuecomment-900541736


   @rajinisivaram Should we also add it to 2.8 branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-17 Thread GitBox


jsancio commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r690620725



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -113,7 +113,7 @@ class BrokerServer(
 
   var dynamicConfigHandlers: Map[String, ConfigHandler] = null
 
-  var replicaManager: ReplicaManager = null
+  @volatile private[this] var _replicaManager: ReplicaManager = null

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy merged pull request #11224: KAFKA-13209: Upgrade jetty-server to fix CVE-2021-34429

2021-08-17 Thread GitBox


omkreddy merged pull request #11224:
URL: https://github.com/apache/kafka/pull/11224


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] venkatesh010 commented on pull request #6233: KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER refresh login fails

2021-08-17 Thread GitBox


venkatesh010 commented on pull request #6233:
URL: https://github.com/apache/kafka/pull/6233#issuecomment-900514731






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #6233: KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER refresh login fails

2021-08-17 Thread GitBox


rondagostino commented on pull request #6233:
URL: https://github.com/apache/kafka/pull/6233#issuecomment-900457425


   .. and yes, restarting the client will get the client out of the bad state 
if you happen to be using a version of the code with this bug.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #6233: KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER refresh login fails

2021-08-17 Thread GitBox


rondagostino commented on pull request #6233:
URL: https://github.com/apache/kafka/pull/6233#issuecomment-900456795


   @venkatesh010 https://issues.apache.org/jira/browse/KAFKA-7902 says `Fix 
Version/s: 2.2.0, 2.1.1`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan opened a new pull request #11224: KAFKA-13209: Upgrade jetty-server to fix CVE-2021-34429

2021-08-17 Thread GitBox


jolshan opened a new pull request #11224:
URL: https://github.com/apache/kafka/pull/11224


   Upgrading to 9.4.43.v20210629
   Release notes: 
https://github.com/eclipse/jetty.project/releases/tag/jetty-9.4.43.v20210629
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram merged pull request #11221: KAFKA-13207: Skip truncation on fetch response with diverging epoch if partition removed from fetcher

2021-08-17 Thread GitBox


rajinisivaram merged pull request #11221:
URL: https://github.com/apache/kafka/pull/11221


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #11221: KAFKA-13207: Skip truncation on fetch response with diverging epoch if partition removed from fetcher

2021-08-17 Thread GitBox


rajinisivaram commented on pull request #11221:
URL: https://github.com/apache/kafka/pull/11221#issuecomment-900433010


   @dajac @hachikuji Thanks for the reviews, merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13209) Upgrade jetty-server to fix CVE-2021-34429

2021-08-17 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-13209:
---
Summary: Upgrade jetty-server to fix CVE-2021-34429  (was: Upgrade 
jetty-server to 9.4.43.v20210629)

> Upgrade jetty-server to fix CVE-2021-34429
> --
>
> Key: KAFKA-13209
> URL: https://issues.apache.org/jira/browse/KAFKA-13209
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Justine Olshan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13209) Upgrade jetty-server to 9.4.43.v20210629

2021-08-17 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-13209:
--

 Summary: Upgrade jetty-server to 9.4.43.v20210629
 Key: KAFKA-13209
 URL: https://issues.apache.org/jira/browse/KAFKA-13209
 Project: Kafka
  Issue Type: Improvement
Reporter: Justine Olshan






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-17 Thread GitBox


junrao commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r690492221



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -113,7 +113,7 @@ class BrokerServer(
 
   var dynamicConfigHandlers: Map[String, ConfigHandler] = null
 
-  var replicaManager: ReplicaManager = null
+  @volatile private[this] var _replicaManager: ReplicaManager = null

Review comment:
   Could you make the same change for KafkaServer too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-17 Thread GitBox


jsancio commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r690455391



##
File path: core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
##
@@ -410,6 +421,32 @@ class KRaftClusterTest {
 }
   }
 
+  private def checkReplicaManager(cluster: KafkaClusterTestKit, 
expectedHosting: List[(Int, List[Boolean])]): Unit = {
+for ((brokerId, partitionsIsHosted) <- expectedHosting) {
+  val broker = cluster.brokers().get(brokerId)
+  // lock and unlock so we can read the replica manager

Review comment:
   I cleaned this up by just making `var replicaManager` volatile.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-17 Thread GitBox


jsancio commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r690442367



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -2074,48 +2073,23 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
-  private[kafka] def calculateDeltaChanges(delta: TopicsDelta)
-: (mutable.HashMap[TopicPartition, Boolean],
-   mutable.HashMap[TopicPartition, LocalLeaderInfo],
-   mutable.HashMap[TopicPartition, LocalLeaderInfo]) = {
-val deleted = new mutable.HashMap[TopicPartition, Boolean]()
-delta.deletedTopicIds().forEach { topicId =>
-  val topicImage = delta.image().getTopic(topicId)
-  topicImage.partitions().keySet().forEach { partitionId =>
-deleted.put(new TopicPartition(topicImage.name(), partitionId), true)
-  }
-}
-val newLocalLeaders = new mutable.HashMap[TopicPartition, 
LocalLeaderInfo]()
-val newLocalFollowers = new mutable.HashMap[TopicPartition, 
LocalLeaderInfo]()
-delta.changedTopics().values().forEach { topicDelta =>
-  topicDelta.newLocalLeaders(config.nodeId).forEach { e =>
-newLocalLeaders.put(new TopicPartition(topicDelta.name(), e.getKey),
-  LocalLeaderInfo(topicDelta.id(), e.getValue))
-  }
-  topicDelta.newLocalFollowers(config.nodeId).forEach { e =>
-newLocalFollowers.put(new TopicPartition(topicDelta.name(), e.getKey),
-  LocalLeaderInfo(topicDelta.id(), e.getValue))
-  }
-}
-(deleted, newLocalLeaders, newLocalFollowers)
-  }
-
   /**
* Apply a KRaft topic change delta.
*
* @param newImageThe new metadata image.
* @param delta   The delta to apply.
*/
   def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = {
-// Before taking the lock, build some hash maps that we will need.
-val (deleted, newLocalLeaders, newLocalFollowers) = 
calculateDeltaChanges(delta)
+// Before taking the lock, compute the local changes
+val localChanges = delta.localChanges(config.nodeId)
 
 replicaStateChangeLock.synchronized {
   // Handle deleted partitions. We need to do this first because we might 
subsequently
   // create new partitions with the same names as the ones we are deleting 
here.
-  if (!deleted.isEmpty) {
-stateChangeLogger.info(s"Deleting ${deleted.size} partition(s).")
-stopPartitions(deleted).foreach { case (topicPartition, e) =>
+  if (!localChanges.deletes.isEmpty) {

Review comment:
   @junrao, I added a test for this and split up the existing test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11221: KAFKA-13207: Skip truncation on fetch response with diverging epoch if partition removed from fetcher

2021-08-17 Thread GitBox


dajac commented on a change in pull request #11221:
URL: https://github.com/apache/kafka/pull/11221#discussion_r690383569



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -262,22 +263,29 @@ abstract class AbstractFetcherThread(name: String,
 val partitionsWithError = mutable.HashSet.empty[TopicPartition]
 
 fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) =>
-  Errors.forCode(leaderEpochOffset.errorCode) match {
-case Errors.NONE =>
-  val offsetTruncationState = getOffsetTruncationState(tp, 
leaderEpochOffset)
-  info(s"Truncating partition $tp with $offsetTruncationState due to 
leader epoch and offset $leaderEpochOffset")
-  if (doTruncate(tp, offsetTruncationState))
-fetchOffsets.put(tp, offsetTruncationState)
-
-case Errors.FENCED_LEADER_EPOCH =>
-  val currentLeaderEpoch = latestEpochsForPartitions.get(tp)
-.map(epochEndOffset => 
Int.box(epochEndOffset.currentLeaderEpoch)).asJava
-  if (onPartitionFenced(tp, currentLeaderEpoch))
+  if (partitionStates.contains(tp)) {
+Errors.forCode(leaderEpochOffset.errorCode) match {
+  case Errors.NONE =>
+val offsetTruncationState = getOffsetTruncationState(tp, 
leaderEpochOffset)
+info(s"Truncating partition $tp with $offsetTruncationState due to 
leader epoch and offset $leaderEpochOffset")
+if (doTruncate(tp, offsetTruncationState))
+  fetchOffsets.put(tp, offsetTruncationState)
+
+  case Errors.FENCED_LEADER_EPOCH =>
+val currentLeaderEpoch = latestEpochsForPartitions.get(tp)
+  .map(epochEndOffset => 
Int.box(epochEndOffset.currentLeaderEpoch)).asJava
+if (onPartitionFenced(tp, currentLeaderEpoch))
+  partitionsWithError += tp
+
+  case error =>
+info(s"Retrying leaderEpoch request for partition $tp as the 
leader reported an error: $error")
 partitionsWithError += tp
-
-case error =>
-  info(s"Retrying leaderEpoch request for partition $tp as the leader 
reported an error: $error")
-  partitionsWithError += tp
+}
+  } else {
+// Partitions may have been removed from the fetcher while the thread 
was waiting for fetch
+// response. Removed partitions are filtered out while holding 
`partitionMapLock` to ensure that we
+// don't update state for any partition that may have already been 
migrated to another thread.
+trace(s"Ignoring epoch offsets for partition '$tp' since it has been 
removed from this fetcher thread.")

Review comment:
   nit: I would remove the simple quotes around `tp` to remain consistent 
with the other logs above (or the other way around).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12421) Improve controller's atomic grouping

2021-08-17 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao reassigned KAFKA-12421:
---

Assignee: HaiyuanZhao

> Improve controller's atomic grouping
> 
>
> Key: KAFKA-12421
> URL: https://issues.apache.org/jira/browse/KAFKA-12421
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Reporter: Jose Armando Garcia Sancio
>Assignee: HaiyuanZhao
>Priority: Major
>  Labels: kip-500
>
> The current controller implementation atomically appends to the metadata log 
> by making sure that all required records are on the same batch. The 
> controller groups all of the records that result from an RPC into one batch. 
> Some of the RPCs are:
>  # Client quota changes
>  # Configuration changes
>  # Feature changes
>  # Topic creation
> This is good enough for correctness but it is more aggressive than necessary. 
> For example, for topic creation since errors are reported independently, the 
> controller only needs to guarantee that all of the records for one topic are 
> committed atomically.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13196) MirrorMaker 2 not always start tasks

2021-08-17 Thread Jozef Vilcek (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jozef Vilcek updated KAFKA-13196:
-
Description: 
I am using MirrorMaker 2.0 and running it via [ 
MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java]
 class. This method will start up `DistributedHerder` and will use 
non-functional `advertisedUrl`, and therefore workers can not talk to each 
other and coordinate.

After upgrading from version `2.4.0` to `2.7.1` I noticed that when I am 
starting up mirror maker it does not always start tasks - just connector is 
running. Doing some amount of stop/starts will eventually start tasks too.

After a bit of digging I did notice that in attempt to configure connector's 
task, code ends up in this [1] branch, where configure request is being 
forwarded to the leader. For some reason, task configuration is not done on 
leader. However, MirrorMaker does not pack RestServer and therefore that 
request will never succeed.

I am not sure what is going no or why it does seem to work better on 2.4.0. I 
noticed that connector start procedure did involve less callbacks on 2.4.0 in 
connector start sequence [2].

 

 

[1] 
[https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494]

[2] 
[https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236]

  was:
I am using MirrorMaker 2.0 and running it via [ 
MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java]
 class. This method will start up `DistributedHerder` and will use 
non-functional `advertisedUrl`, and therefore workers can not talk to each 
other and coordinate.

After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am 
starting up mirror maker ti does not always start tasks - just connector is 
executing. Doing some amount of stop and start it will eventually start tasks 
too.

After a bit of digging I did noticed that in attempt to configure connector's 
task, code ends up in this [1] branch, where configure request is being 
forwarded to the leader. From some reason, task configuration is not done on 
leader. However, MirrorMaker does not pack RestServer and therefore that 
request will never succeed.

I am not sure what is going no or why it does seem to work better on 2.4.0. I 
noticed that connector start procedure did involve less callbacks on 2.4.0 in 
connector start sequence [2].

 

 

[1] 
[https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494]

[2] 
[https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236]


> MirrorMaker 2 not always start tasks
> 
>
> Key: KAFKA-13196
> URL: https://issues.apache.org/jira/browse/KAFKA-13196
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.1
>Reporter: Jozef Vilcek
>Priority: Major
>
> I am using MirrorMaker 2.0 and running it via [ 
> MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java]
>  class. This method will start up `DistributedHerder` and will use 
> non-functional `advertisedUrl`, and therefore workers can not talk to each 
> other and coordinate.
> After upgrading from version `2.4.0` to `2.7.1` I noticed that when I am 
> starting up mirror maker it does not always start tasks - just connector is 
> running. Doing some amount of stop/starts will eventually start tasks too.
> After a bit of digging I did notice that in attempt to configure connector's 
> task, code ends up in this [1] branch, where configure request is being 
> forwarded to the leader. For some reason, task configuration is not done on 
> leader. However, MirrorMaker does not pack RestServer and therefore that 
> request will never succeed.
> I am not sure what is going no or why it does seem to work better on 2.4.0. I 
> noticed that connector start procedure did involve less callbacks on 2.4.0 in 
> connector start sequence [2].
>  
>  
> [1] 
> [https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494]
> [2] 
> [https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on a change in pull request #11221: KAFKA-13207: Don't partition state on fetch response with diverging epoch if partition removed from fetcher

2021-08-17 Thread GitBox


rajinisivaram commented on a change in pull request #11221:
URL: https://github.com/apache/kafka/pull/11221#discussion_r690150564



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -268,7 +262,10 @@ abstract class AbstractFetcherThread(name: String,
 val fetchOffsets = mutable.HashMap.empty[TopicPartition, 
OffsetTruncationState]
 val partitionsWithError = mutable.HashSet.empty[TopicPartition]
 
-fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) =>
+// Partitions may have been removed from the fetcher while the thread was 
waiting for fetch
+// response. Filter out removed partitions while holding 
`partitionMapLock` to ensure that we
+// don't update state for any partition that may have already been 
migrated to another thread.
+fetchedEpochs.filter { case (tp, _) => partitionStates.contains(tp) 
}.forKeyValue { (tp, leaderEpochOffset) =>

Review comment:
   @hachikuji Thanks for the review, updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] venkatesh010 commented on pull request #6233: KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER refresh login fails

2021-08-17 Thread GitBox


venkatesh010 commented on pull request #6233:
URL: https://github.com/apache/kafka/pull/6233#issuecomment-900070176


   And is restarting service a temporary solution for it? As loginContext will 
get created again post restart of client


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] venkatesh010 commented on pull request #6233: KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER refresh login fails

2021-08-17 Thread GitBox


venkatesh010 commented on pull request #6233:
URL: https://github.com/apache/kafka/pull/6233#issuecomment-900069794


   @rondagostino @rajinisivaram  which version of apache Kafka client or if I'm 
using spring cloud stream, which version of spring-kafka should I use..to 
incorporate this fix?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13032) Impossible stacktrace

2021-08-17 Thread Yanwen Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanwen Lin reassigned KAFKA-13032:
--

Assignee: Yanwen Lin

> Impossible stacktrace
> -
>
> Key: KAFKA-13032
> URL: https://issues.apache.org/jira/browse/KAFKA-13032
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Niclas Hedhman
>Assignee: Yanwen Lin
>Priority: Minor
>  Labels: beginner, easy-fix
>
> I am presented with a stacktrace that has not a single touch point in my 
> code, so it is incredibly difficult to figure out where the problem could be. 
> I think more RuntimeExceptions need to be caught and pull out information at 
> each level that is providing any additional hint of where we are.
> For instance, each node could prepend its reference/name and one would have a 
> chance to see where we are...
> ```
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_140, processor=KSTREAM-SOURCE-00, topic=_poll, 
> partition=140, offset=0, stacktrace=java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>   at 
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:268)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:50)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>   at 
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>   at 
> 

[GitHub] [kafka] edipesh19 closed pull request #11223: Upgrade zookeeper version to 3.6.3

2021-08-17 Thread GitBox


edipesh19 closed pull request #11223:
URL: https://github.com/apache/kafka/pull/11223


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] edipesh19 opened a new pull request #11223: Upgrade zookeeper version to 3.6.3

2021-08-17 Thread GitBox


edipesh19 opened a new pull request #11223:
URL: https://github.com/apache/kafka/pull/11223


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org