[GitHub] [kafka] rite2nikhil commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
rite2nikhil commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r500743939 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -432,14 +455,22 @@ abstract class AbstractFetcherThread(name: String, failedPartitions.removeAll(initialFetchStates.keySet) initialFetchStates.forKeyValue { (tp, initialFetchState) => -// We can skip the truncation step iff the leader epoch matches the existing epoch +// For IBP 2.7 onwards, we can rely on truncation based on diverging data returned in fetch responses. +// For older versions, we can skip the truncation step iff the leader epoch matches the existing epoch val currentState = partitionStates.stateValue(tp) -val updatedState = if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.leaderEpoch) { +val updatedState = if (initialFetchState.offset >= 0 && isTruncationOnFetchSupported && initialFetchState.lastFetchedEpoch.nonEmpty) { Review comment: NIT: Should this be the first check in the if () statement ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init
vvcephei commented on a change in pull request #9388: URL: https://github.com/apache/kafka/pull/9388#discussion_r500715702 ## File path: streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java ## @@ -34,6 +34,7 @@ * Demonstrate the use of {@link MockProcessorContext} for testing the {@link Processor} in the {@link WordCountProcessorDemo}. */ public class WordCountProcessorTest { +@SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437 Review comment: This ticket needs to go in to 2.7.0 also, but I split it out for reviewability. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java ## @@ -45,12 +46,19 @@ public void flush() { throw new UnsupportedOperationException(ERROR_MESSAGE); } +@Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { throw new UnsupportedOperationException(ERROR_MESSAGE); } +@Override +public void init(final StateStoreContext context, + final StateStore root) { +throw new UnsupportedOperationException(ERROR_MESSAGE); Review comment: There are going to be a lot of duplicated init methods. It's not great, but hopefully we can drop the old API before too long. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java ## @@ -47,9 +48,42 @@ public static StreamsMetricsImpl getMetricsImpl(final ProcessorContext context) return (StreamsMetricsImpl) context.metrics(); } +/** + * Should be removed as part of KAFKA-10217 + */ +public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext context) { +return (StreamsMetricsImpl) context.metrics(); +} + public static String changelogFor(final ProcessorContext context, final String storeName) { return context instanceof InternalProcessorContext ? ((InternalProcessorContext) context).changelogFor(storeName) : ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); } + +public static String changelogFor(final StateStoreContext context, final String storeName) { +return context instanceof InternalProcessorContext +? ((InternalProcessorContext) context).changelogFor(storeName) +: ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); +} + +public static InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) { +if (context instanceof InternalProcessorContext) { +return (InternalProcessorContext) context; +} else { +throw new IllegalArgumentException( +"This component requires internal features of Kafka Streams and must be disabled for unit tests." +); +} +} Review comment: I replaced a lot of casts with this checked-cast method, which also lets us get rid of a lot of similar cast-checking blocks, which were inconsistently applied. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -83,14 +85,40 @@ this.valueSerde = valueSerde; } +@Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { -this.context = context; +this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null; taskId = context.taskId().toString(); initStoreSerde(context); streamsMetrics = (StreamsMetricsImpl) context.metrics(); +registerMetrics(); Review comment: I wasn't able to extract out quite as much common code in the Metered implementations because they need to work regardless of whether the context is an InternalProcessorContext or whether it's a straight mock (for unit tests). ## File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java ## @@ -65,7 +65,11 @@ * * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition + * @deprecated Since 2.7.0. Callers should invoke {@link this#init(StateStoreContext, StateStore)} instead. + * Implementers may choose to implement this method for backward compatibility or to throw an + * informative exception instead. */ +@Deprecated Review comment: Adding the deprecation tag right now lets us be sure we encountered all places this method appears in the codebase. ## File path:
[GitHub] [kafka] vvcephei opened a new pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init
vvcephei opened a new pull request #9388: URL: https://github.com/apache/kafka/pull/9388 * all wrapping stores should pass StateStoreContext init through to the same method on the wrapped store and not translate it to ProcessorContext init * base-level stores should handle StateStoreContext init so that callers passing a non-InternalProcessorContext implementation will be able to initialize the store * extra tests are added to verify the desired behavior ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes
[ https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10437: - Description: In addition to implementing the KIP, search for and resolve these todos: {color:#008dde}TODO will be fixed in KAFKA-10437{color} Also, add unit tests in test-utils making sure we can initialize _all_ the kinds of store with the MPC and MPC.getSSC. was: In addition to implementing the KIP, search for and resolve these todos: {color:#008dde}TODO will be fixed in KAFKA-10437{color} > KIP-478: Implement test-utils changes > - > > Key: KAFKA-10437 > URL: https://issues.apache.org/jira/browse/KAFKA-10437 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.7.0 > > > In addition to implementing the KIP, search for and resolve these todos: > {color:#008dde}TODO will be fixed in KAFKA-10437{color} > Also, add unit tests in test-utils making sure we can initialize _all_ the > kinds of store with the MPC and MPC.getSSC. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes
[ https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10437: - Description: In addition to implementing the KIP, search for and resolve these todos: {color:#008dde}TODO will be fixed in KAFKA-10437{color} > KIP-478: Implement test-utils changes > - > > Key: KAFKA-10437 > URL: https://issues.apache.org/jira/browse/KAFKA-10437 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.7.0 > > > In addition to implementing the KIP, search for and resolve these todos: > {color:#008dde}TODO will be fixed in KAFKA-10437{color} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
ableegoldman commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r500682632 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -364,6 +370,73 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler handler = throwable -> handleStreamsUncaughtException(throwable, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +for (final StreamThread thread : threads) { Review comment: If we want to use the same StreamsUncaughtExceptionHandler so users don't have to implement two different handlers, which sounds right to me, then maybe we should add a parameter to the `handle` method to indicate whether it's a global or stream thread. Or split into a separate `handleStreamThreadException` and `handleGlobalThreadException` methods, or something like that (and possibly rename `SHUTDOWN_STREAM_THREAD` to `SHUTDOWN_THREAD`) WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
ableegoldman commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r500682039 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public interface StreamsUncaughtExceptionHandler { +/** + * Inspect a record and the exception received. + * @param exception the actual exception + */ +StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handle(final Throwable exception); Review comment: Should we consider also passing in the thread name? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
ableegoldman commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r500681912 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public interface StreamsUncaughtExceptionHandler { +/** + * Inspect a record and the exception received. + * @param exception the actual exception + */ +StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handle(final Throwable exception); + +/** + * Enumeration that describes the response from the exception handler. + */ +enum StreamsUncaughtExceptionHandlerResponse { + + +SHUTDOWN_STREAM_THREAD(0, "SHUTDOWN_STREAM_THREAD"), +//REPLACE_STREAM_THREAD(1, "REPLACE_STREAM_THREAD"), +SHUTDOWN_KAFKA_STREAMS_CLIENT(2, "SHUTDOWN_KAFKA_STREAMS_CLIENT"), +SHUTDOWN_KAFKA_STREAMS_APPLICATION(3, "SHUTDOWN_KAFKA_STREAMS_APPLICATION"); + + +/** an english description of the api--this is for debugging and can change */ +public final String name; + +/** the permanent and immutable id of an API--this can't change ever */ +public final int id; + +StreamsUncaughtExceptionHandlerResponse(final int id, final String name) { Review comment: Oh, sorry, I thought this was the `handle` method. Ignore me This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
ableegoldman commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r500681512 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -364,6 +370,73 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler handler = throwable -> handleStreamsUncaughtException(throwable, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +for (final StreamThread thread : threads) { Review comment: What's our plan for the global thread? I didn't think of this during the KIP discussion, and sorry if it was brought up there and I just forgot about it. But it seems like we should still give users a non-deprecated way to set a handler for the global thread. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
ableegoldman commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r500681407 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public interface StreamsUncaughtExceptionHandler { +/** + * Inspect a record and the exception received. + * @param exception the actual exception + */ +StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handle(final Throwable exception); + +/** + * Enumeration that describes the response from the exception handler. + */ +enum StreamsUncaughtExceptionHandlerResponse { + + +SHUTDOWN_STREAM_THREAD(0, "SHUTDOWN_STREAM_THREAD"), +//REPLACE_STREAM_THREAD(1, "REPLACE_STREAM_THREAD"), +SHUTDOWN_KAFKA_STREAMS_CLIENT(2, "SHUTDOWN_KAFKA_STREAMS_CLIENT"), +SHUTDOWN_KAFKA_STREAMS_APPLICATION(3, "SHUTDOWN_KAFKA_STREAMS_APPLICATION"); + + +/** an english description of the api--this is for debugging and can change */ +public final String name; + +/** the permanent and immutable id of an API--this can't change ever */ +public final int id; + +StreamsUncaughtExceptionHandlerResponse(final int id, final String name) { Review comment: What is the `id`? And what is the `name`, is that the thread name? If so, can we clarify this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10578) Convert KTable to a KStream using the previous value
[ https://issues.apache.org/jira/browse/KAFKA-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-10578: --- Assignee: Javier Freire Riobó > Convert KTable to a KStream using the previous value > > > Key: KAFKA-10578 > URL: https://issues.apache.org/jira/browse/KAFKA-10578 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Javier Freire Riobó >Assignee: Javier Freire Riobó >Priority: Minor > Labels: kip > > Imagine that we have an entity for which we want to emit the difference > between the current and the previous state. The simplest case would be that > the entity was an integer number and you want to emit the subtraction between > the current and previous values. > For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 > (3 - 6) is expected. > The way to achieve this with kafka streams would be through an aggregate. > The main problem, apart from needing more code, is that if the same event is > received twice at the same time and the commit time is not 0, the difference > is deleted and nothing is emitted. > KIP-675: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10578) Convert KTable to a KStream using the previous value
[ https://issues.apache.org/jira/browse/KAFKA-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17209253#comment-17209253 ] Matthias J. Sax commented on KAFKA-10578: - [~javier.freire] – I added you to the list of contributors and assigned the ticket to you. You can know also self-assign tickets. > Convert KTable to a KStream using the previous value > > > Key: KAFKA-10578 > URL: https://issues.apache.org/jira/browse/KAFKA-10578 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Javier Freire Riobó >Priority: Minor > Labels: kip > > Imagine that we have an entity for which we want to emit the difference > between the current and the previous state. The simplest case would be that > the entity was an integer number and you want to emit the subtraction between > the current and previous values. > For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 > (3 - 6) is expected. > The way to achieve this with kafka streams would be through an aggregate. > The main problem, apart from needing more code, is that if the same event is > received twice at the same time and the commit time is not 0, the difference > is deleted and nothing is emitted. > KIP-675: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10578) Convert KTable to a KStream using the previous value
[ https://issues.apache.org/jira/browse/KAFKA-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10578: Summary: Convert KTable to a KStream using the previous value (was: KIP-675: Convert KTable to a KStream using the previous value) > Convert KTable to a KStream using the previous value > > > Key: KAFKA-10578 > URL: https://issues.apache.org/jira/browse/KAFKA-10578 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Javier Freire Riobó >Priority: Minor > Labels: kip > > Imagine that we have an entity for which we want to emit the difference > between the current and the previous state. The simplest case would be that > the entity was an integer number and you want to emit the subtraction between > the current and previous values. > For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 > (3 - 6) is expected. > The way to achieve this with kafka streams would be through an aggregate. > The main problem, apart from needing more code, is that if the same event is > received twice at the same time and the commit time is not 0, the difference > is deleted and nothing is emitted. > KIP-675: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10578) KIP-675: Convert KTable to a KStream using the previous value
[ https://issues.apache.org/jira/browse/KAFKA-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10578: Description: Imagine that we have an entity for which we want to emit the difference between the current and the previous state. The simplest case would be that the entity was an integer number and you want to emit the subtraction between the current and previous values. For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 (3 - 6) is expected. The way to achieve this with kafka streams would be through an aggregate. The main problem, apart from needing more code, is that if the same event is received twice at the same time and the commit time is not 0, the difference is deleted and nothing is emitted. KIP-675: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value] was: Imagine that we have an entity for which we want to emit the difference between the current and the previous state. The simplest case would be that the entity was an integer number and you want to emit the subtraction between the current and previous values. For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 (3 - 6) is expected. The way to achieve this with kafka streams would be through an aggregate. The main problem, apart from needing more code, is that if the same event is received twice at the same time and the commit time is not 0, the difference is deleted and nothing is emitted. > KIP-675: Convert KTable to a KStream using the previous value > - > > Key: KAFKA-10578 > URL: https://issues.apache.org/jira/browse/KAFKA-10578 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Javier Freire Riobó >Priority: Minor > Labels: kip > > Imagine that we have an entity for which we want to emit the difference > between the current and the previous state. The simplest case would be that > the entity was an integer number and you want to emit the subtraction between > the current and previous values. > For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 > (3 - 6) is expected. > The way to achieve this with kafka streams would be through an aggregate. > The main problem, apart from needing more code, is that if the same event is > received twice at the same time and the commit time is not 0, the difference > is deleted and nothing is emitted. > KIP-675: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10578) KIP-675: Convert KTable to a KStream using the previous value
[ https://issues.apache.org/jira/browse/KAFKA-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10578: Labels: kip (was: ) > KIP-675: Convert KTable to a KStream using the previous value > - > > Key: KAFKA-10578 > URL: https://issues.apache.org/jira/browse/KAFKA-10578 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Javier Freire Riobó >Priority: Minor > Labels: kip > > Imagine that we have an entity for which we want to emit the difference > between the current and the previous state. The simplest case would be that > the entity was an integer number and you want to emit the subtraction between > the current and previous values. > For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 > (3 - 6) is expected. > The way to achieve this with kafka streams would be through an aggregate. > The main problem, apart from needing more code, is that if the same event is > received twice at the same time and the commit time is not 0, the difference > is deleted and nothing is emitted. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9139: KAFKA-9929: Support backward iterator on SessionStore
ableegoldman commented on a change in pull request #9139: URL: https://github.com/apache/kafka/pull/9139#discussion_r500664927 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java ## @@ -278,6 +326,23 @@ public void shouldFetchCorrectlyAcrossSegments() { assertFalse(results.hasNext()); } +@Test +public void shouldBackwardFetchCorrectlyAcrossSegments() { +final Windowed a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); +final Windowed a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); +final Windowed a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); +cachingStore.put(a1, "1".getBytes()); +cachingStore.put(a2, "2".getBytes()); +cachingStore.flush(); +cachingStore.put(a3, "3".getBytes()); Review comment: Can we add a few more records that span multiple segments that don't get flushed as well? ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java ## @@ -301,6 +366,29 @@ public void shouldFetchRangeCorrectlyAcrossSegments() { assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys); } +@Test +public void shouldBackwardFetchRangeCorrectlyAcrossSegments() { +final Windowed a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); +final Windowed aa1 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); +final Windowed a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); +final Windowed a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); +final Windowed aa3 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); +cachingStore.put(a1, "1".getBytes()); +cachingStore.put(aa1, "1".getBytes()); +cachingStore.put(a2, "2".getBytes()); +cachingStore.put(a3, "3".getBytes()); +cachingStore.put(aa3, "3".getBytes()); + +final KeyValueIterator, byte[]> rangeResults = +cachingStore.backwardFindSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2); +final Set> keys = new HashSet<>(); +while (rangeResults.hasNext()) { +keys.add(rangeResults.next().key); +} +rangeResults.close(); +assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys); Review comment: We're losing the ordering check by comparing this as a set, let's use a list (or whatever) to verify the actual order ## File path: streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java ## @@ -82,11 +118,15 @@ public boolean hasNext() { public KeyValue, V> next() { return it.next(); } - } ); } +@Override +public KeyValueIterator, V> backwardFetch(K from, K to) { +return null; Review comment: I guess it probably doesn't matter since we presumably aren't using these backward methods of the ReadOnlySessionStoreStub, but it seems like it might result in some tricky NPEs to debug if ever someone does try to use it in a test. If you don't feel like implementing it I think it's fine to just throw UnsupportedOperationException and say that you'll have to implement this to use it. Or just copy the code from the forward direction and flip it 路♀️ Same goes for all the methods in here that return null ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java ## @@ -456,68 +562,88 @@ public void shouldClearNamespaceCacheOnClose() { assertEquals(0, cache.size()); } -@Test(expected = InvalidStateStoreException.class) +@Test public void shouldThrowIfTryingToFetchFromClosedCachingStore() { cachingStore.close(); -cachingStore.fetch(keyA); +assertThrows(InvalidStateStoreException.class, () -> cachingStore.fetch(keyA)); } -@Test(expected = InvalidStateStoreException.class) +@Test public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() { cachingStore.close(); -cachingStore.findSessions(keyA, 0, Long.MAX_VALUE); +assertThrows(InvalidStateStoreException.class, () -> cachingStore.findSessions(keyA, 0, Long.MAX_VALUE)); } -@Test(expected = InvalidStateStoreException.class) +@Test public void shouldThrowIfTryingToRemoveFromClosedCachingStore() { cachingStore.close(); -cachingStore.remove(new Windowed<>(keyA, new SessionWindow(0, 0))); +assertThrows(InvalidStateStoreException.class, () -> cachingStore.remove(new Windowed<>(keyA, new
[GitHub] [kafka] guozhangwang commented on pull request #8988: KAFKA-10199: Separate restore threads
guozhangwang commented on pull request #8988: URL: https://github.com/apache/kafka/pull/8988#issuecomment-704615880 test this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8988: KAFKA-10199: Separate restore threads
guozhangwang commented on pull request #8988: URL: https://github.com/apache/kafka/pull/8988#issuecomment-704615342 System test: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4195/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9139: KAFKA-9929: Support backward iterator on SessionStore
ableegoldman commented on a change in pull request #9139: URL: https://github.com/apache/kafka/pull/9139#discussion_r500658560 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java ## @@ -270,25 +335,32 @@ public void close() { private CacheIteratorWrapper(final Bytes key, final long earliestSessionEndTime, - final long latestSessionStartTime) { -this(key, key, earliestSessionEndTime, latestSessionStartTime); + final long latestSessionStartTime, + final boolean forward) { +this(key, key, earliestSessionEndTime, latestSessionStartTime, forward); } private CacheIteratorWrapper(final Bytes keyFrom, final Bytes keyTo, final long earliestSessionEndTime, - final long latestSessionStartTime) { + final long latestSessionStartTime, + final boolean forward) { this.keyFrom = keyFrom; this.keyTo = keyTo; this.latestSessionStartTime = latestSessionStartTime; this.lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp); this.segmentInterval = cacheFunction.getSegmentInterval(); +this.forward = forward; this.currentSegmentId = cacheFunction.segmentId(earliestSessionEndTime); Review comment: Ok I _think_ that for the reverse case, this should be initialized to `cacheFunction.segmentId(maxObservedTimestamp)` and `lastSegmentId` should be initialized to this (`segmentId(earliestSessionEndTime)`). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on pull request #9001: URL: https://github.com/apache/kafka/pull/9001#issuecomment-704596581 @junrao The test failure in `MirrorConnectorsIntegrationTest.testReplication` does not seem related. I have rebased the PR now against latest AK trunk, I'd like to see if the failure happens again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10024) Add dynamic configuration and enforce quota for per-IP connection rate limits
[ https://issues.apache.org/jira/browse/KAFKA-10024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anna Povzner reassigned KAFKA-10024: Assignee: David Mao (was: Anna Povzner) > Add dynamic configuration and enforce quota for per-IP connection rate limits > - > > Key: KAFKA-10024 > URL: https://issues.apache.org/jira/browse/KAFKA-10024 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Anna Povzner >Assignee: David Mao >Priority: Major > Labels: features > > This JIRA is for the second part of KIP-612 – Add per-IP connection creation > rate limits. > As described here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
ableegoldman commented on pull request #9280: URL: https://github.com/apache/kafka/pull/9280#issuecomment-704587824 I don't think they're fixed on trunk lol (besides, I believe Jenkins merges the PR with trunk before running the tests so it's running the most recent code anyways). None of the failures seem to be related to this PR so I wouldn't worry about it. I think this PR is ready to be merged This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rgo edited a comment on pull request #2541: KAFKA-4759: Add support for subnet masks in SimpleACLAuthorizer
rgo edited a comment on pull request #2541: URL: https://github.com/apache/kafka/pull/2541#issuecomment-704547984 I've been working on it. I'm going to open a new PR and I'll link it to this one. PR: #9387 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rgo opened a new pull request #9387: KAFKA-4759: Acl authorizer subnet support
rgo opened a new pull request #9387: URL: https://github.com/apache/kafka/pull/9387 Add subnet support to ACL authorizer. For IPv4 and IPv6, it supports: - IP address range - Subnet CIDR notation Test strategy has been simple: - Define ranges and set ACLs with IPs included in that range and other that it is not included inside the range. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rgo commented on pull request #2541: KAFKA-4759: Add support for subnet masks in SimpleACLAuthorizer
rgo commented on pull request #2541: URL: https://github.com/apache/kafka/pull/2541#issuecomment-704547984 I've been working on it. I'm going to open a new PR and I'll link it to this one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10550) Update AdminClient and kafka-topics.sh to support topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-10550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-10550: --- Description: Change some AdminClient methods to expose and support topic IDs (describe, delete, return the id on create) Make changes to kafka-topics.sh --describe so a user can specify a topic name to describe with the --topic parameter, or alternatively the user can supply a topic ID with the --topic_id parameter was: Make changes to kafka-topics.sh --describe so a user can specify a topic name to describe with the --topic parameter, or alternatively the user can supply a topic ID with the --topic_id parameter > Update AdminClient and kafka-topics.sh to support topic IDs > --- > > Key: KAFKA-10550 > URL: https://issues.apache.org/jira/browse/KAFKA-10550 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Priority: Major > > Change some AdminClient methods to expose and support topic IDs (describe, > delete, return the id on create) > > Make changes to kafka-topics.sh --describe so a user can specify a topic > name to describe with the --topic parameter, or alternatively the user can > supply a topic ID with the --topic_id parameter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10550) Update AdminClient and kafka-topics.sh to support topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-10550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-10550: --- Summary: Update AdminClient and kafka-topics.sh to support topic IDs (was: Update kafka-topics.sh to support topic IDs) > Update AdminClient and kafka-topics.sh to support topic IDs > --- > > Key: KAFKA-10550 > URL: https://issues.apache.org/jira/browse/KAFKA-10550 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Priority: Major > > Make changes to kafka-topics.sh --describe so a user can specify a topic > name to describe with the --topic parameter, or alternatively the user can > supply a topic ID with the --topic_id parameter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10580) Add topic ID support to Fetch request
Justine Olshan created KAFKA-10580: -- Summary: Add topic ID support to Fetch request Key: KAFKA-10580 URL: https://issues.apache.org/jira/browse/KAFKA-10580 Project: Kafka Issue Type: Sub-task Reporter: Justine Olshan Prevent fetching a stale topic with topic IDs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
nym3r0s commented on pull request #9280: URL: https://github.com/apache/kafka/pull/9280#issuecomment-704543777 @ableegoldman @hachikuji - Should I rebase these changes off of trunk (assuming these tests are fixed on trunk) ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10547) Add topic IDs to MetadataResponse, UpdateMetadata
[ https://issues.apache.org/jira/browse/KAFKA-10547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-10547: --- Summary: Add topic IDs to MetadataResponse, UpdateMetadata (was: Add topic IDs to MetadataResponse, UpdateMetadata, and Fetch) > Add topic IDs to MetadataResponse, UpdateMetadata > - > > Key: KAFKA-10547 > URL: https://issues.apache.org/jira/browse/KAFKA-10547 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Priority: Major > > Prevent reads from deleted topics > Will be able to use TopicDescription to identify the topic ID -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] splett2 opened a new pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits
splett2 opened a new pull request #9386: URL: https://github.com/apache/kafka/pull/9386 This PR implements the part of KIP-612 for adding IP throttling enforcement, and a ZK entity for configuring dynamic IP throttles. I will add `kafka-configs` support as well as `KafkaApi` reconfiguration support in a follow-up PR. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #9385: KAFKA-9274: fix incorrect default value for `task.timeout.ms` config
mjsax opened a new pull request #9385: URL: https://github.com/apache/kafka/pull/9385 Part of KIP-572. Also add handler method to trigger/reset the timeout on a task. Call for review @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor
mjsax commented on a change in pull request #9384: URL: https://github.com/apache/kafka/pull/9384#discussion_r500574894 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java ## @@ -203,8 +204,8 @@ public void shouldReturnAllActiveTasksToPreviousOwnerRegardlessOfBalanceAndTrigg builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor1"); final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); -createMockTaskManager(allTasks); adminClient = EasyMock.createMock(AdminClient.class); +createMockTaskManager(allTasks); Review comment: We need to setup the admin mock before the TM mock now (similar elsewhere) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor
mjsax commented on a change in pull request #9384: URL: https://github.com/apache/kafka/pull/9384#discussion_r500574645 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java ## @@ -131,12 +130,13 @@ // Make sure to complete setting up any mocks (such as TaskManager or AdminClient) before configuring the assignor private void configurePartitionAssignorWith(final Map props) { +EasyMock.replay(taskManager, adminClient); + final Map configMap = configProps(); configMap.putAll(props); streamsConfig = new StreamsConfig(configMap); partitionAssignor.configure(configMap); -EasyMock.replay(taskManager, adminClient); Review comment: We need to setup the mocks before calling `partitionAssignor.configure()` now, as we call `taskManager#adminClient()` in this method (similar elsewhere) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor
mjsax commented on a change in pull request #9384: URL: https://github.com/apache/kafka/pull/9384#discussion_r500574127 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java ## @@ -291,11 +271,7 @@ public String userEndPoint() { } } -public Admin adminClient() { -return adminClient; -} - -public InternalTopicManager internalTopicManager() { +public InternalTopicManager internalTopicManager(final Admin adminClient) { Review comment: Instead of passing the `Admin` as parameter, we could call `taskManager#adminClient()` in the next line, however, this requires to make the method `public` (it's package-private atm) thus I opted to for parameter-passing instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor
mjsax opened a new pull request #9384: URL: https://github.com/apache/kafka/pull/9384 Currently, we pass `AdminClient` and `TaskManager` into `StreamsPartitionAssignor` and use `TaskManager#mainConsumer()` to get access to the main consumer. However, TM also had a reference to `AdminClient` and thus we can simplify the setup by only passing the `TaskManager` reference. Call for review @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override
scanterog commented on a change in pull request #9313: URL: https://github.com/apache/kafka/pull/9313#discussion_r500569567 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java ## @@ -199,8 +199,8 @@ protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX; protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX; -protected static final String PRODUCER_CLIENT_PREFIX = "producer."; -protected static final String CONSUMER_CLIENT_PREFIX = "consumer."; +protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer."; +protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer."; Review comment: 臘 got it. I misunderstood it. Let me give it a try. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override
mimaison commented on a change in pull request #9313: URL: https://github.com/apache/kafka/pull/9313#discussion_r500568357 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java ## @@ -199,8 +199,8 @@ protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX; protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX; -protected static final String PRODUCER_CLIENT_PREFIX = "producer."; -protected static final String CONSUMER_CLIENT_PREFIX = "consumer."; +protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer."; +protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer."; Review comment: I meant add support for that format. We obviously want to keep supporting the existing formats This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override
scanterog commented on a change in pull request #9313: URL: https://github.com/apache/kafka/pull/9313#discussion_r500566839 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java ## @@ -199,8 +199,8 @@ protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX; protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX; -protected static final String PRODUCER_CLIENT_PREFIX = "producer."; -protected static final String CONSUMER_CLIENT_PREFIX = "consumer."; +protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer."; +protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer."; Review comment: For users running on a Connect cluster, that format is not supported right now: ``` source.producer.some-producer-setting: 123 ``` The only supported is (unless I'm missing something): ``` "producer.some-producer-setting": 123 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override
mimaison commented on a change in pull request #9313: URL: https://github.com/apache/kafka/pull/9313#discussion_r500562684 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java ## @@ -199,8 +199,8 @@ protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX; protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX; -protected static final String PRODUCER_CLIENT_PREFIX = "producer."; -protected static final String CONSUMER_CLIENT_PREFIX = "consumer."; +protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer."; +protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer."; Review comment: I don't think the format mentioned in https://github.com/apache/kafka/pull/9313#discussion_r498298987 would break compatibility. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #9336: MINOR: Don't publish javadocs for raft module
ijuma merged pull request #9336: URL: https://github.com/apache/kafka/pull/9336 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9336: MINOR: Don't publish javadocs for raft module
ijuma commented on pull request #9336: URL: https://github.com/apache/kafka/pull/9336#issuecomment-704521581 Sounds good. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9336: MINOR: Don't publish javadocs for raft module
hachikuji commented on pull request #9336: URL: https://github.com/apache/kafka/pull/9336#issuecomment-704519950 @ijuma Thanks, LGTM. I think we can consider exposing an API eventually, but for now I'd like to have full flexibility to change it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
dajac commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r500544582 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -1321,25 +1106,15 @@ object GroupMetadataManager { */ def readMessageKey(buffer: ByteBuffer): BaseKey = { val version = buffer.getShort -val keySchema = schemaForKey(version) -val key = keySchema.read(buffer) - -if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) { +if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION && version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) { // version 0 and 1 refer to offset - val group = key.get(OFFSET_KEY_GROUP_FIELD).asInstanceOf[String] - val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String] - val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int] - - OffsetKey(version, GroupTopicPartition(group, new TopicPartition(topic, partition))) - -} else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) { + val key = new OffsetCommitKey(new ByteBufferAccessor(buffer), version) + OffsetKey(version, GroupTopicPartition(key.group, new TopicPartition(key.topic, key.partition))) +} else if (version >= GroupMetadataKeyData.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataKeyData.HIGHEST_SUPPORTED_VERSION) { // version 2 refers to offset Review comment: nit: Not related to your changes but could fix this comment? `refers to offset` => `refers to group metadata`? ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -1257,60 +1070,32 @@ object GroupMetadataManager { assignment: Map[String, Array[Byte]], apiVersion: ApiVersion): Array[Byte] = { -val (version, value) = { - if (apiVersion < KAFKA_0_10_1_IV0) -(0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0)) - else if (apiVersion < KAFKA_2_1_IV0) -(1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1)) - else if (apiVersion < KAFKA_2_3_IV0) -(2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2)) - else -(3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3)) -} - -value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse("")) -value.set(GENERATION_KEY, groupMetadata.generationId) -value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull) -value.set(LEADER_KEY, groupMetadata.leaderOrNull) - -if (version >= 2) - value.set(CURRENT_STATE_TIMESTAMP_KEY, groupMetadata.currentStateTimestampOrDefault) - -val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata => - val memberStruct = value.instance(MEMBERS_KEY) - memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId) - memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId) - memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost) - memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs) - - if (version > 0) -memberStruct.set(REBALANCE_TIMEOUT_KEY, memberMetadata.rebalanceTimeoutMs) - - if (version >= 3) -memberStruct.set(GROUP_INSTANCE_ID_KEY, memberMetadata.groupInstanceId.orNull) - - // The group is non-empty, so the current protocol must be defined - val protocol = groupMetadata.protocolName.orNull - if (protocol == null) -throw new IllegalStateException("Attempted to write non-empty group metadata with no defined protocol") - - val metadata = memberMetadata.metadata(protocol) - memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata)) - - val memberAssignment = assignment(memberMetadata.memberId) - assert(memberAssignment != null) - - memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment)) - - memberStruct -} - -value.set(MEMBERS_KEY, memberArray.toArray) - -val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) -byteBuffer.putShort(version) -value.writeTo(byteBuffer) -byteBuffer.array() +val version = + if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort + else if (apiVersion < KAFKA_2_1_IV0) 1.toShort + else if (apiVersion < KAFKA_2_3_IV0) 2.toShort + else 3.toShort + +serializeMessage(version, new GroupMetadataValue() + .setProtocolType(groupMetadata.protocolType.getOrElse("")) + .setGeneration(groupMetadata.generationId) + .setProtocol(groupMetadata.protocolName.orNull) + .setLeader(groupMetadata.leaderOrNull) + .setCurrentStateTimestamp(groupMetadata.currentStateTimestampOrDefault) + .setMembers(groupMetadata.allMemberMetadata.map { memberMetadata => +new GroupMetadataValue.MemberMetadata() + .setMemberId(memberMetadata.memberId) + .setClientId(memberMetadata.clientId) + .setClientHost(memberMetadata.clientHost) +
[GitHub] [kafka] ableegoldman commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
ableegoldman commented on pull request #9280: URL: https://github.com/apache/kafka/pull/9280#issuecomment-704502487 LGTM! Looks like the builds just failed with some unrelated flaky tests: ``` Build / JDK 8 / org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy Build / JDK 8 / kafka.api.TransactionsTest.testBumpTransactionalEpoch ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10579) Flaky test connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy
[ https://issues.apache.org/jira/browse/KAFKA-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-10579: Labels: flaky-test (was: ) > Flaky test > connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy > > > Key: KAFKA-10579 > URL: https://issues.apache.org/jira/browse/KAFKA-10579 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: flaky-test > > > {{java.lang.NullPointerException > at > java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) > at org.reflections.Store.getAllIncluding(Store.java:82) > at org.reflections.Store.getAll(Store.java:93) > at org.reflections.Reflections.getSubTypesOf(Reflections.java:404) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209) > at > org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) > at > org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91) > at > org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:167) > at > org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy(InternalTopicsIntegrationTest.java:260)}} > {{}} > https://github.com/apache/kafka/pull/9280/checks?check_run_id=1214776222{{}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10579) Flaky test connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy
Sophie Blee-Goldman created KAFKA-10579: --- Summary: Flaky test connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy Key: KAFKA-10579 URL: https://issues.apache.org/jira/browse/KAFKA-10579 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Sophie Blee-Goldman {{java.lang.NullPointerException at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) at org.reflections.Store.getAllIncluding(Store.java:82) at org.reflections.Store.getAll(Store.java:93) at org.reflections.Reflections.getSubTypesOf(Reflections.java:404) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209) at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91) at org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50) at org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:167) at org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy(InternalTopicsIntegrationTest.java:260)}} {{}} https://github.com/apache/kafka/pull/9280/checks?check_run_id=1214776222{{}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #9373: KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores
ableegoldman commented on pull request #9373: URL: https://github.com/apache/kafka/pull/9373#issuecomment-704493219 FYI this should be cherrypicked back to 2.6 once merged This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] brbrown25 commented on pull request #9057: KAFKA-10299: Implementing Kafka Connect Hash SMT to allow for hashing…
brbrown25 commented on pull request #9057: URL: https://github.com/apache/kafka/pull/9057#issuecomment-704492883 @mimaison I've updated the kip and put out a vote request but not seeing any action. I've gone ahead and updated this pr to be updated with latest trunk as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9831) Failing test: EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]
[ https://issues.apache.org/jira/browse/KAFKA-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17209038#comment-17209038 ] Sophie Blee-Goldman commented on KAFKA-9831: [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9373/3/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/Build___JDK_11___shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_beta_/] h3. Stacktrace java.lang.AssertionError: Expected: <[KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 6), KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), KeyValue(1, 36), KeyValue(1, 45)]> but: was <[KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 6), KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), KeyValue(1, 36), KeyValue(1, 45), KeyValue(1, 55)]> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.integration.EosIntegrationTest.checkResultPerKey(EosIntegrationTest.java:281) at org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState(EosIntegrationTest.java:480) > Failing test: > EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta] > -- > > Key: KAFKA-9831 > URL: https://issues.apache.org/jira/browse/KAFKA-9831 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: John Roesler >Assignee: Matthias J. Sax >Priority: Major > Attachments: one.stdout.txt, two.stdout.txt > > > I've seen this fail twice in a row on the same build, but with different > errors. Stacktraces follow; stdout is attached. > One: > {noformat} > java.lang.AssertionError: Did not receive all 40 records from topic > singlePartitionOutputTopic within 6 ms > Expected: is a value equal to or greater than <40> > but: <39> was less than <40> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:517) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:513) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:491) > at > org.apache.kafka.streams.integration.EosIntegrationTest.readResult(EosIntegrationTest.java:766) > at > org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState(EosIntegrationTest.java:473) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at
[GitHub] [kafka] guozhangwang commented on a change in pull request #8988: KAFKA-10199: Separate restore threads
guozhangwang commented on a change in pull request #8988: URL: https://github.com/apache/kafka/pull/8988#discussion_r500521516 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ## @@ -65,10 +65,6 @@ public ProcessorContextImpl(final TaskId id, @Override public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) { -if (stateManager.taskType() != TaskType.ACTIVE) { Review comment: Actually I realized that we call this function sometimes (e.g. upon initialization) even when the task is already active, so we cannot just reject and fail if it was not in standby This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r500521571 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -384,9 +390,8 @@ public static StreamThread create(final InternalTopologyBuilder builder, builder, threadId, logContext, -assignmentErrorCode, Review comment: this diff is a bit off. The assignmentErrorCode was added in this PR. I didn't change the order of the params for no reason... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition
guozhangwang commented on pull request #9020: URL: https://github.com/apache/kafka/pull/9020#issuecomment-704474872 test this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang closed pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition
guozhangwang closed pull request #9020: URL: https://github.com/apache/kafka/pull/9020 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition
guozhangwang commented on pull request #9020: URL: https://github.com/apache/kafka/pull/9020#issuecomment-704462953 test this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #9321: KAFKA-9929: fix: add missing default implementations
guozhangwang merged pull request #9321: URL: https://github.com/apache/kafka/pull/9321 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9321: KAFKA-9929: fix: add missing default implementations
guozhangwang commented on pull request #9321: URL: https://github.com/apache/kafka/pull/9321#issuecomment-704461565 LGTM! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition
guozhangwang commented on pull request #9020: URL: https://github.com/apache/kafka/pull/9020#issuecomment-704460681 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch merged pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop
rhauch merged pull request #8910: URL: https://github.com/apache/kafka/pull/8910 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition
guozhangwang commented on pull request #9020: URL: https://github.com/apache/kafka/pull/9020#issuecomment-704460234 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10338) Support PEM format for SSL certificates and private key
[ https://issues.apache.org/jira/browse/KAFKA-10338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-10338. Fix Version/s: 2.7.0 Reviewer: Manikumar Resolution: Fixed > Support PEM format for SSL certificates and private key > --- > > Key: KAFKA-10338 > URL: https://issues.apache.org/jira/browse/KAFKA-10338 > Project: Kafka > Issue Type: New Feature > Components: security >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.7.0 > > > We currently support only file-based JKS/PKCS12 format for SSL key stores and > trust stores. It will be good to add support for PEM as configuration values > that fits better with config externalization. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram merged pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)
rajinisivaram merged pull request #9345: URL: https://github.com/apache/kafka/pull/9345 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)
rajinisivaram commented on pull request #9345: URL: https://github.com/apache/kafka/pull/9345#issuecomment-704456290 @omkreddy Thanks for the review, merging to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17209006#comment-17209006 ] Sophie Blee-Goldman commented on KAFKA-5998: Yeah I think both of those (running with shared state directory and using /tmp) can each lead to this exception. For testing on your local machine you should just configure each of the instances to use a different state dir (preferably something other than /tmp, but that is up to you). See [the docs|https://kafka.apache.org/26/documentation/streams/developer-guide/config-streams.html#state-dir] for this config: > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Assignee: John Roesler >Priority: Critical > Fix For: 2.2.2, 2.4.0, 2.3.1 > > Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, > exc.txt, props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) >
[jira] [Commented] (KAFKA-10520) InitProducerId may be blocked if least loaded node is not ready to send
[ https://issues.apache.org/jira/browse/KAFKA-10520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17208995#comment-17208995 ] Sophie Blee-Goldman commented on KAFKA-10520: - Hey [~rsivaram], is this something we can get fixed for 2.7? I'm just asking because the freeze deadlines are approaching, and this seems like it might be a simple fix for a pretty much fatal error (although workarounds do exist) > InitProducerId may be blocked if least loaded node is not ready to send > --- > > Key: KAFKA-10520 > URL: https://issues.apache.org/jira/browse/KAFKA-10520 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Rajini Sivaram >Priority: Major > Fix For: 2.7.0 > > > From the logs of a failing producer that shows InitProducerId timing out > after request timeout, it looks like we don't poll while waiting for > transactional producer to be initialized and FindCoordinator request cannot > be sent. The producer configuration used one bootstrap server and > `max.in.flight.requests.per.connection=1`. The failing sequence: > # Producer sends MetadataRequest to least loaded node (bootstrap server) > # Producer is ready to send InitProducerId, needs to find transaction > coordinator > # Producer creates FindCoordinator request, but the only node known is the > bootstrap server. Producer cannot send to this node since there is already > the Metadata request in flight and max.inflight is 1. > # Producer waits without polling, so Metadata response is not processed. > InitProducerId times out eventually. > > > We need to update the condition used to determine whether Sender should > poll() to fix this issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] piotrrzysko commented on pull request #9371: KAFKA-10510: Validate replication factor consistency on reassignment
piotrrzysko commented on pull request #9371: URL: https://github.com/apache/kafka/pull/9371#issuecomment-704430920 Thanks for the review, @stanislavkozlovski. If I understand your point clearly, you are wondering whether this validation will block changing the replication factor for all partitions (replication factor of a topic). Right? It will not if all requested reassignments in the batch have the same number of target replicas for a given topic. Please take look at the test `ReassignPartitionsIntegrationTest::testReassignmentFailOnInconsistentReplicationFactorBetweenPartitions` - topic `bar`. It is a simple case because the topic has only one partition, but it shows that changing the replication factor from 3 to 4 is possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10527) Voters should always initialize as followers
[ https://issues.apache.org/jira/browse/KAFKA-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-10527. - Resolution: Fixed > Voters should always initialize as followers > > > Key: KAFKA-10527 > URL: https://issues.apache.org/jira/browse/KAFKA-10527 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > The current state initialization logic preserves whatever state the broker > was in when it was shutdown. In particular, if the node was previously a > leader, it will remain a leader. This can be dangerous if we want to consider > optimizations such as in KAFKA-10526 since the leader might lose unflushed > data following the restart. It would be safer to always initialize as a > follower so that a leader's tenure never crosses process restarts. This helps > to guarantee the uniqueness of the (offset, epoch) tuple which the > replication protocol depends on. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on a change in pull request #9378: MINOR: ACLs for secured cluster system tests
rondagostino commented on a change in pull request #9378: URL: https://github.com/apache/kafka/pull/9378#discussion_r500465547 ## File path: tests/kafkatest/services/kafka/kafka.py ## @@ -451,7 +451,8 @@ def _kafka_topics_cmd(self, node, force_use_zk_connection): set. If Admin client is not going to be used, don't set the environment variable. """ kafka_topic_script = self.path.script("kafka-topics.sh", node) -skip_security_settings = force_use_zk_connection or not self.all_nodes_topic_command_supports_bootstrap_server() +skip_security_settings = force_use_zk_connection or not self.all_nodes_topic_command_supports_bootstrap_server() \ + or self.interbroker_security_protocol == SecurityConfig.PLAINTEXT Review comment: Yes, good point, I think it would be a helpful to consolidate the assumptions into one method (or a couple or a few, at most). I'll see what I can do about refactoring those assumptions out and reusing the logic instead of having it sprinkled around so much as it is now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override
scanterog commented on a change in pull request #9313: URL: https://github.com/apache/kafka/pull/9313#discussion_r500464804 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java ## @@ -199,8 +199,8 @@ protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX; protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX; -protected static final String PRODUCER_CLIENT_PREFIX = "producer."; -protected static final String CONSUMER_CLIENT_PREFIX = "consumer."; +protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer."; +protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer."; Review comment: @mimaison just a friendly ping. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9378: MINOR: ACLs for secured cluster system tests
rondagostino commented on a change in pull request #9378: URL: https://github.com/apache/kafka/pull/9378#discussion_r500464258 ## File path: tests/kafkatest/services/kafka/kafka.py ## @@ -983,7 +988,10 @@ def _topic_command_connect_setting(self, node, force_use_zk_connection): bootstrap server, otherwise returns zookeeper connection string. """ if not force_use_zk_connection and self.all_nodes_topic_command_supports_bootstrap_server(): -connection_setting = "--bootstrap-server %s" % (self.bootstrap_servers(self.security_protocol)) +if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: +connection_setting = "--bootstrap-server %s" % (self.bootstrap_servers(self.interbroker_security_protocol)) +else: +connection_setting = "--bootstrap-server %s" % (self.bootstrap_servers(self.security_protocol)) Review comment: This code is identifying the port to contact when we are using `--bootstrap-server` instead of `--zookeeper`, and we use the port associated with `PLAINTEXT` rather than `SASL_{PLAINTEXT,SSL}` or `SSL` if `PLAINTEXT` is in use. The assumption is that if `PLAINTEXT` is in use it will be the inter-broker security protocol rather than (or in addition to) the client security protocol, which is the case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9348: KAFKA-10527; Voters should not reinitialize as leader in same epoch
hachikuji merged pull request #9348: URL: https://github.com/apache/kafka/pull/9348 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9378: MINOR: ACLs for secured cluster system tests
rondagostino commented on a change in pull request #9378: URL: https://github.com/apache/kafka/pull/9378#discussion_r500460487 ## File path: tests/kafkatest/services/security/kafka_acls.py ## @@ -93,11 +97,13 @@ def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False): force_use_zk_connection = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server() -cmd = "%(cmd_prefix)s --add --cluster --operation=ClusterAction --allow-principal=%(principal)s" % { -'cmd_prefix': self._acl_cmd_prefix(kafka, node, force_use_zk_connection), -'principal': principal -} -kafka.run_cli_tool(node, cmd) +for operation in ['ClusterAction', 'Alter', 'Create']: Review comment: Yes, Alter is needed to create user SCRAM credentials, and Create is needed to create topics. When we start up a cluster we create the `__consumer_offsets` topic and a `test_topic` (typically). If the test is using SCRAM we also create the SCRAM credentials at this point. We now use `--bootstrap-server` instead of `--zookeeper` for these CLI operations, and without these ACLs a system test will not be able to perform these necessary actions if security is enabled. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur
lct45 commented on pull request #9383: URL: https://github.com/apache/kafka/pull/9383#issuecomment-704410260 @ableegoldman ready for initial review This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9378: MINOR: ACLs for secured cluster system tests
rajinisivaram commented on a change in pull request #9378: URL: https://github.com/apache/kafka/pull/9378#discussion_r500429377 ## File path: tests/kafkatest/services/kafka/kafka.py ## @@ -451,7 +451,8 @@ def _kafka_topics_cmd(self, node, force_use_zk_connection): set. If Admin client is not going to be used, don't set the environment variable. """ kafka_topic_script = self.path.script("kafka-topics.sh", node) -skip_security_settings = force_use_zk_connection or not self.all_nodes_topic_command_supports_bootstrap_server() +skip_security_settings = force_use_zk_connection or not self.all_nodes_topic_command_supports_bootstrap_server() \ + or self.interbroker_security_protocol == SecurityConfig.PLAINTEXT Review comment: should we add a method to KafkaService or SecurityConfig that tells you which listener to use for tools? Using inter-broker here looks a bit odd, even though it becomes clear when you look at the other changes. ## File path: tests/kafkatest/services/kafka/kafka.py ## @@ -983,7 +988,10 @@ def _topic_command_connect_setting(self, node, force_use_zk_connection): bootstrap server, otherwise returns zookeeper connection string. """ if not force_use_zk_connection and self.all_nodes_topic_command_supports_bootstrap_server(): -connection_setting = "--bootstrap-server %s" % (self.bootstrap_servers(self.security_protocol)) +if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: +connection_setting = "--bootstrap-server %s" % (self.bootstrap_servers(self.interbroker_security_protocol)) +else: +connection_setting = "--bootstrap-server %s" % (self.bootstrap_servers(self.security_protocol)) Review comment: Is this assuming that `else` section always means PLAINTEXT since there are no security configs here? ## File path: tests/kafkatest/services/security/kafka_acls.py ## @@ -93,11 +97,13 @@ def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False): force_use_zk_connection = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server() -cmd = "%(cmd_prefix)s --add --cluster --operation=ClusterAction --allow-principal=%(principal)s" % { -'cmd_prefix': self._acl_cmd_prefix(kafka, node, force_use_zk_connection), -'principal': principal -} -kafka.run_cli_tool(node, cmd) +for operation in ['ClusterAction', 'Alter', 'Create']: Review comment: we are adding more ACLs than we had before. Are they necessary - it is better to limit ACLs unless they are necessary for the tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
junrao commented on pull request #9001: URL: https://github.com/apache/kafka/pull/9001#issuecomment-704393633 @abbccdda : Any more comments from you? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9358: MINOR: Refactor unit tests around RocksDBConfigSetter
guozhangwang commented on pull request #9358: URL: https://github.com/apache/kafka/pull/9358#issuecomment-704386882 Thanks @abbccdda for the reviews! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #9358: MINOR: Refactor unit tests around RocksDBConfigSetter
guozhangwang merged pull request #9358: URL: https://github.com/apache/kafka/pull/9358 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10578) KIP-675: Convert KTable to a KStream using the previous value
[ https://issues.apache.org/jira/browse/KAFKA-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Javier Freire Riobó updated KAFKA-10578: Issue Type: Improvement (was: Wish) > KIP-675: Convert KTable to a KStream using the previous value > - > > Key: KAFKA-10578 > URL: https://issues.apache.org/jira/browse/KAFKA-10578 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Javier Freire Riobó >Priority: Minor > > Imagine that we have an entity for which we want to emit the difference > between the current and the previous state. The simplest case would be that > the entity was an integer number and you want to emit the subtraction between > the current and previous values. > For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 > (3 - 6) is expected. > The way to achieve this with kafka streams would be through an aggregate. > The main problem, apart from needing more code, is that if the same event is > received twice at the same time and the commit time is not 0, the difference > is deleted and nothing is emitted. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lct45 opened a new pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur
lct45 opened a new pull request #9383: URL: https://github.com/apache/kafka/pull/9383 Add data to subscriptionUserData to make sure that it's different each time a consumer rejoins ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests
ning2008wisc commented on pull request #9224: URL: https://github.com/apache/kafka/pull/9224#issuecomment-704353923 @mimaison I think this pr is ready for initial review. Please take your time when available. Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-6733) Support of printing additional ConsumerRecord fields in DefaultMessageFormatter
[ https://issues.apache.org/jira/browse/KAFKA-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-6733: --- Fix Version/s: 2.7.0 > Support of printing additional ConsumerRecord fields in > DefaultMessageFormatter > --- > > Key: KAFKA-6733 > URL: https://issues.apache.org/jira/browse/KAFKA-6733 > Project: Kafka > Issue Type: Improvement > Components: tools >Affects Versions: 2.7.0 >Reporter: Mateusz Zakarczemny >Assignee: Badai Aqrandista >Priority: Minor > Fix For: 2.7.0 > > > It would be useful to have possibility of printing headers, partition and > offset in ConsoleConsumer. Especially support of headers seems to be missing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-6733) Support of printing additional ConsumerRecord fields in DefaultMessageFormatter
[ https://issues.apache.org/jira/browse/KAFKA-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-6733: --- Affects Version/s: 2.7.0 > Support of printing additional ConsumerRecord fields in > DefaultMessageFormatter > --- > > Key: KAFKA-6733 > URL: https://issues.apache.org/jira/browse/KAFKA-6733 > Project: Kafka > Issue Type: Improvement > Components: tools >Affects Versions: 2.7.0 >Reporter: Mateusz Zakarczemny >Assignee: Badai Aqrandista >Priority: Minor > > It would be useful to have possibility of printing headers, partition and > offset in ConsoleConsumer. Especially support of headers seems to be missing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] bbejeck commented on pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered
bbejeck commented on pull request #9237: URL: https://github.com/apache/kafka/pull/9237#issuecomment-704347684 Java 8 failed with `kafka.network.ConnectionQuotasTest.testNoConnectionLimitsByDefault` Java 11 failed with `org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta` Java 15 passed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] coded9 commented on pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null
coded9 commented on pull request #8575: URL: https://github.com/apache/kafka/pull/8575#issuecomment-704328653 Any update or possible workaround ? @rhauch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
chia7712 commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r500361487 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -997,174 +996,7 @@ object GroupMetadataManager { val MetricsGroup: String = "group-coordinator-metrics" val LoadTimeSensor: String = "GroupPartitionLoadTime" - private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort - private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort - - private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING), -new Field("topic", STRING), -new Field("partition", INT32)) - private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group") - private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic") - private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata") - private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64), -new Field("expire_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp") - private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema( -new Field("offset", INT64), -new Field("leader_epoch", INT32), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset") - private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch") - private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp") - - private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING)) - private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group") - - private val MEMBER_ID_KEY = "member_id" - private val GROUP_INSTANCE_ID_KEY = "group_instance_id" - private val CLIENT_ID_KEY = "client_id" - private val CLIENT_HOST_KEY = "client_host" - private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout" - private val SESSION_TIMEOUT_KEY = "session_timeout" - private val SUBSCRIPTION_KEY = "subscription" - private val ASSIGNMENT_KEY = "assignment" - - private val MEMBER_METADATA_V0 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val MEMBER_METADATA_V1 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(REBALANCE_TIMEOUT_KEY, INT32), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1 - - private val MEMBER_METADATA_V3 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(REBALANCE_TIMEOUT_KEY, INT32), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val PROTOCOL_TYPE_KEY = "protocol_type" - private val GENERATION_KEY = "generation" - private val PROTOCOL_KEY = "protocol" - private val LEADER_KEY = "leader" -
[GitHub] [kafka] rajinisivaram commented on pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)
rajinisivaram commented on pull request #9345: URL: https://github.com/apache/kafka/pull/9345#issuecomment-704327619 @omkreddy Thanks for the review, have addressed the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)
rajinisivaram commented on a change in pull request #9345: URL: https://github.com/apache/kafka/pull/9345#discussion_r500360036 ## File path: clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ## @@ -167,17 +181,12 @@ public void testValidEndpointIdentificationSanIp() throws Exception { @Test public void testValidEndpointIdentificationCN() throws Exception { String node = "0"; Review comment: fixed these and few other warnings This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)
rajinisivaram commented on a change in pull request #9345: URL: https://github.com/apache/kafka/pull/9345#discussion_r500359056 ## File path: clients/src/test/java/org/apache/kafka/common/network/CertStores.java ## @@ -54,13 +60,30 @@ public CertStores(boolean server, String commonName, InetAddress hostAddress) th } private CertStores(boolean server, String commonName, TestSslUtils.CertificateBuilder certBuilder) throws Exception { +this(server, commonName, "RSA", certBuilder, false); +} + +private CertStores(boolean server, String commonName, String keyAlgorithm, TestSslUtils.CertificateBuilder certBuilder, boolean usePem) throws Exception { String name = server ? "server" : "client"; Mode mode = server ? Mode.SERVER : Mode.CLIENT; -File truststoreFile = File.createTempFile(name + "TS", ".jks"); -sslConfig = TestSslUtils.createSslConfig(!server, true, mode, truststoreFile, name, commonName, certBuilder); +File truststoreFile = usePem ? null : File.createTempFile(name + "TS", ".jks"); +sslConfig = new SslConfigsBuilder(mode) +.useClientCert(!server) +.certAlias(name) +.cn(commonName) +.createNewTrustStore(truststoreFile) +.certBuilder(certBuilder) +.algorithm(keyAlgorithm) +.usePem(usePem) +.build(); } + public Map getTrustingConfig(CertStores truststoreConfig) { +return getTrustingConfig(truststoreConfig, false); +} + +public Map getTrustingConfig(CertStores truststoreConfig, boolean usePemCerts) { Review comment: removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter
bbejeck commented on pull request #9099: URL: https://github.com/apache/kafka/pull/9099#issuecomment-704322766 Thanks for the contribution @badaiaqrandista! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter
bbejeck commented on pull request #9099: URL: https://github.com/apache/kafka/pull/9099#issuecomment-704322552 Merged #9099 into trunk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter
bbejeck merged pull request #9099: URL: https://github.com/apache/kafka/pull/9099 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
chia7712 commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r500352497 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -1257,60 +1059,31 @@ object GroupMetadataManager { assignment: Map[String, Array[Byte]], apiVersion: ApiVersion): Array[Byte] = { -val (version, value) = { - if (apiVersion < KAFKA_0_10_1_IV0) -(0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0)) - else if (apiVersion < KAFKA_2_1_IV0) -(1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1)) - else if (apiVersion < KAFKA_2_3_IV0) -(2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2)) - else -(3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3)) -} - -value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse("")) -value.set(GENERATION_KEY, groupMetadata.generationId) -value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull) -value.set(LEADER_KEY, groupMetadata.leaderOrNull) - -if (version >= 2) - value.set(CURRENT_STATE_TIMESTAMP_KEY, groupMetadata.currentStateTimestampOrDefault) - -val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata => - val memberStruct = value.instance(MEMBERS_KEY) - memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId) - memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId) - memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost) - memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs) - - if (version > 0) -memberStruct.set(REBALANCE_TIMEOUT_KEY, memberMetadata.rebalanceTimeoutMs) - - if (version >= 3) -memberStruct.set(GROUP_INSTANCE_ID_KEY, memberMetadata.groupInstanceId.orNull) - - // The group is non-empty, so the current protocol must be defined - val protocol = groupMetadata.protocolName.orNull - if (protocol == null) -throw new IllegalStateException("Attempted to write non-empty group metadata with no defined protocol") - - val metadata = memberMetadata.metadata(protocol) - memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata)) - - val memberAssignment = assignment(memberMetadata.memberId) - assert(memberAssignment != null) - - memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment)) - - memberStruct -} - -value.set(MEMBERS_KEY, memberArray.toArray) - -val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) -byteBuffer.putShort(version) -value.writeTo(byteBuffer) -byteBuffer.array() +val version = + if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort + else if (apiVersion < KAFKA_2_1_IV0) 1.toShort + else if (apiVersion < KAFKA_2_3_IV0) 2.toShort + else 3.toShort + +serializeMessage(version, new GroupMetadataValue() + .setProtocolType(groupMetadata.protocolType.getOrElse("")) + .setGeneration(groupMetadata.generationId) + .setProtocol(groupMetadata.protocolName.orNull) + .setLeader(groupMetadata.leaderOrNull) + .setCurrentStateTimestamp(groupMetadata.currentStateTimestampOrDefault) + .setMembers(groupMetadata.allMemberMetadata.map(memberMetadata => +new GroupMetadataValue.MemberMetadata() + .setMemberId(memberMetadata.memberId) + .setClientId(memberMetadata.clientId) + .setClientHost(memberMetadata.clientHost) + .setSessionTimeout(memberMetadata.sessionTimeoutMs) + .setRebalanceTimeout(memberMetadata.rebalanceTimeoutMs) + .setGroupInstanceId(memberMetadata.groupInstanceId.orNull) + .setSubscription(groupMetadata.protocolName.map(memberMetadata.metadata) +.getOrElse(throw new IllegalStateException("The group is non-empty so the current protocol must be defined"))) Review comment: ok. keep previous error message This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
dajac commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r500293553 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -1257,60 +1059,31 @@ object GroupMetadataManager { assignment: Map[String, Array[Byte]], apiVersion: ApiVersion): Array[Byte] = { -val (version, value) = { - if (apiVersion < KAFKA_0_10_1_IV0) -(0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0)) - else if (apiVersion < KAFKA_2_1_IV0) -(1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1)) - else if (apiVersion < KAFKA_2_3_IV0) -(2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2)) - else -(3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3)) -} - -value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse("")) -value.set(GENERATION_KEY, groupMetadata.generationId) -value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull) -value.set(LEADER_KEY, groupMetadata.leaderOrNull) - -if (version >= 2) - value.set(CURRENT_STATE_TIMESTAMP_KEY, groupMetadata.currentStateTimestampOrDefault) - -val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata => - val memberStruct = value.instance(MEMBERS_KEY) - memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId) - memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId) - memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost) - memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs) - - if (version > 0) -memberStruct.set(REBALANCE_TIMEOUT_KEY, memberMetadata.rebalanceTimeoutMs) - - if (version >= 3) -memberStruct.set(GROUP_INSTANCE_ID_KEY, memberMetadata.groupInstanceId.orNull) - - // The group is non-empty, so the current protocol must be defined - val protocol = groupMetadata.protocolName.orNull - if (protocol == null) -throw new IllegalStateException("Attempted to write non-empty group metadata with no defined protocol") - - val metadata = memberMetadata.metadata(protocol) - memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata)) - - val memberAssignment = assignment(memberMetadata.memberId) - assert(memberAssignment != null) - - memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment)) - - memberStruct -} - -value.set(MEMBERS_KEY, memberArray.toArray) - -val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) -byteBuffer.putShort(version) -value.writeTo(byteBuffer) -byteBuffer.array() +val version = + if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort + else if (apiVersion < KAFKA_2_1_IV0) 1.toShort + else if (apiVersion < KAFKA_2_3_IV0) 2.toShort + else 3.toShort + +serializeMessage(version, new GroupMetadataValue() + .setProtocolType(groupMetadata.protocolType.getOrElse("")) + .setGeneration(groupMetadata.generationId) + .setProtocol(groupMetadata.protocolName.orNull) + .setLeader(groupMetadata.leaderOrNull) + .setCurrentStateTimestamp(groupMetadata.currentStateTimestampOrDefault) + .setMembers(groupMetadata.allMemberMetadata.map(memberMetadata => Review comment: nit: We tend to use curly braces when the lambda does not fit on the same line. ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -1257,60 +1059,31 @@ object GroupMetadataManager { assignment: Map[String, Array[Byte]], apiVersion: ApiVersion): Array[Byte] = { -val (version, value) = { - if (apiVersion < KAFKA_0_10_1_IV0) -(0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0)) - else if (apiVersion < KAFKA_2_1_IV0) -(1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1)) - else if (apiVersion < KAFKA_2_3_IV0) -(2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2)) - else -(3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3)) -} - -value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse("")) -value.set(GENERATION_KEY, groupMetadata.generationId) -value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull) -value.set(LEADER_KEY, groupMetadata.leaderOrNull) - -if (version >= 2) - value.set(CURRENT_STATE_TIMESTAMP_KEY, groupMetadata.currentStateTimestampOrDefault) - -val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata => - val memberStruct = value.instance(MEMBERS_KEY) - memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId) - memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId) - memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost) - memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs) - - if (version > 0) -
[GitHub] [kafka] kkonstantine merged pull request #9379: MINOR: Annotate test BlockingConnectorTest as integration test
kkonstantine merged pull request #9379: URL: https://github.com/apache/kafka/pull/9379 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #9379: MINOR: Annotate test BlockingConnectorTest as integration test
kkonstantine commented on pull request #9379: URL: https://github.com/apache/kafka/pull/9379#issuecomment-704314583 JDK 15 was green. Rest of the failures are not relevant. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on pull request #9371: KAFKA-10510: Validate replication factor consistency on reassignment
stanislavkozlovski commented on pull request #9371: URL: https://github.com/apache/kafka/pull/9371#issuecomment-704300915 Thanks for the PR @piotrrzysko. The changes look good although I'm not sure if we're not regressing the ability to change the replication factor of a topic if we're to outright block it. Perhaps we could have a workaround in the form of allowing the reassignment batch to run if it's changing all the partitions for the topic. Another solution I like more is adding a flag to the AlterPartitionReassignments API that allows it to change the replication factor and keeping this validation. In the long term, I think we may want to consider an API that allows you to alter the replication factor of a topic. cc @ijuma @gwenshap @cmccabe @hachikuji @soondenana do you happen to have any thoughts on the matter? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram opened a new pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
rajinisivaram opened a new pull request #9382: URL: https://github.com/apache/kafka/pull/9382 For IBP 2.7 onwards, fetch responses include diverging epoch and offset in fetch responses if `lastFetchedEpoch` is provided in the fetch request. This PR uses that information for truncation and avoids the additional OffsetForLeaderEpoch requests in followers when `lastFetchEpoch` is known. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] javierfreire opened a new pull request #9381: KIP-675: Convert KTable to a KStream using the previous value
javierfreire opened a new pull request #9381: URL: https://github.com/apache/kafka/pull/9381 This gives the possibility of converting a table into a stream using a mapper with the new and old values as arguments. [KIP-675](https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value) KTableImpl tests complete with new methods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on a change in pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)
omkreddy commented on a change in pull request #9345: URL: https://github.com/apache/kafka/pull/9345#discussion_r500186433 ## File path: clients/src/test/java/org/apache/kafka/test/TestSslUtils.java ## @@ -199,6 +206,156 @@ public static void createKeyStore(String filename, return builder.build(); } +public static void convertToPem(Map sslProps, boolean writeToFile, boolean encryptPrivateKey) throws Exception { +String tsPath = (String) sslProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG); +String tsType = (String) sslProps.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG); +Password tsPassword = (Password) sslProps.remove(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG); +Password trustCerts = (Password) sslProps.remove(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG); +if (trustCerts == null && tsPath != null) { +trustCerts = exportCertificates(tsPath, tsPassword, tsType); +} +if (trustCerts != null) { +if (tsPath == null) { +tsPath = File.createTempFile("truststore", ".pem").getPath(); +sslProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tsPath); +} +sslProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, PEM_TYPE); +if (writeToFile) +writeToFile(tsPath, trustCerts); +else { +sslProps.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, trustCerts); +sslProps.remove(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG); +} +} + +String ksPath = (String) sslProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); +Password certChain = (Password) sslProps.remove(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG); +Password key = (Password) sslProps.remove(SslConfigs.SSL_KEYSTORE_KEY_CONFIG); +if (certChain == null && ksPath != null) { +String ksType = (String) sslProps.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG); +Password ksPassword = (Password) sslProps.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); +Password keyPassword = (Password) sslProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG); +certChain = exportCertificates(ksPath, ksPassword, ksType); +Password pemKeyPassword = encryptPrivateKey ? keyPassword : null; +key = exportPrivateKey(ksPath, ksPassword, keyPassword, ksType, pemKeyPassword); +if (!encryptPrivateKey) +sslProps.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); +} else if (!encryptPrivateKey) { Review comment: Can we remove empty else block? ## File path: clients/src/test/java/org/apache/kafka/common/network/CertStores.java ## @@ -54,13 +60,30 @@ public CertStores(boolean server, String commonName, InetAddress hostAddress) th } private CertStores(boolean server, String commonName, TestSslUtils.CertificateBuilder certBuilder) throws Exception { +this(server, commonName, "RSA", certBuilder, false); +} + +private CertStores(boolean server, String commonName, String keyAlgorithm, TestSslUtils.CertificateBuilder certBuilder, boolean usePem) throws Exception { String name = server ? "server" : "client"; Mode mode = server ? Mode.SERVER : Mode.CLIENT; -File truststoreFile = File.createTempFile(name + "TS", ".jks"); -sslConfig = TestSslUtils.createSslConfig(!server, true, mode, truststoreFile, name, commonName, certBuilder); +File truststoreFile = usePem ? null : File.createTempFile(name + "TS", ".jks"); +sslConfig = new SslConfigsBuilder(mode) +.useClientCert(!server) +.certAlias(name) +.cn(commonName) +.createNewTrustStore(truststoreFile) +.certBuilder(certBuilder) +.algorithm(keyAlgorithm) +.usePem(usePem) +.build(); } + public Map getTrustingConfig(CertStores truststoreConfig) { +return getTrustingConfig(truststoreConfig, false); +} + +public Map getTrustingConfig(CertStores truststoreConfig, boolean usePemCerts) { Review comment: unused usePemCerts? ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -923,9 +926,12 @@ object KafkaConfig { val SslKeystoreLocationDoc = SslConfigs.SSL_KEYSTORE_LOCATION_DOC val SslKeystorePasswordDoc = SslConfigs.SSL_KEYSTORE_PASSWORD_DOC val SslKeyPasswordDoc = SslConfigs.SSL_KEY_PASSWORD_DOC + val SslKeystoreKeyDoc = SslConfigs.SSL_KEYSTORE_KEY_DOC + val SslKeystoreCertificateChainDoc = SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG Review comment: SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG => SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_DOC ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -923,9 +926,12 @@ object
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17208688#comment-17208688 ] Sandeep commented on KAFKA-5998: [~ableegoldman] , Yes I was trying out in my local machine. The tmp/ directory is the default configuration. For Production this will be deployed in separate docker containers. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Assignee: John Roesler >Priority: Critical > Fix For: 2.2.2, 2.4.0, 2.3.1 > > Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, > exc.txt, props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >