Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
Owen-CH-Leung commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1548103258 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -391,4 +420,15 @@ private String[] addBootstrapServer(String... args) { return newArgs.toArray(new String[0]); } + +private void retryUntilEqual(List expected, Supplier> outputSupplier) { +try { +TestUtils.waitForCondition( +() -> expected.equals(outputSupplier.get()), +"TopicOffsets did not match. Expected: " + expectedTestTopicOffsets() + ", but was: " + outputSupplier.get() Review Comment: Ok done. Let's see what we get from there =) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup Part 2 [kafka]
gharris1727 commented on PR #15639: URL: https://github.com/apache/kafka/pull/15639#issuecomment-2032352619 Hi @chia7712 This [test is disabled in CI](https://github.com/apache/kafka/blob/ee61bb721eecb0404929f125fe43392f3d024453/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala#L622), but fails on trunk if it is re-enabled. I believe that this problem would also be apparent if one tried to use the dynamic reconfiguration feature to change the default log configs. AFAIU the earlier PR completely breaks that feature. We should prioritize https://issues.apache.org/jira/browse/KAFKA-6527 to re-enable this test, as it would have caught this regression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
chia7712 commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1548067397 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -391,4 +420,15 @@ private String[] addBootstrapServer(String... args) { return newArgs.toArray(new String[0]); } + +private void retryUntilEqual(List expected, Supplier> outputSupplier) { +try { +TestUtils.waitForCondition( +() -> expected.equals(outputSupplier.get()), +"TopicOffsets did not match. Expected: " + expectedTestTopicOffsets() + ", but was: " + outputSupplier.get() Review Comment: yep, I loop the test with this PR 1000 times, all pass :( It seems we need to reproduce the failure on our CI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16418) Review/split long-running admin client integration tests
[ https://issues.apache.org/jira/browse/KAFKA-16418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16418: --- Summary: Review/split long-running admin client integration tests (was: Split long-running admin client integration tests) > Review/split long-running admin client integration tests > > > Key: KAFKA-16418 > URL: https://issues.apache.org/jira/browse/KAFKA-16418 > Project: Kafka > Issue Type: Task > Components: clients >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > > Review PlaintextAdminIntegrationTest and attempt to split it to allow for > parallelization and improve build times. This tests is the longest running > integration test in kafka.api, so a similar approach to what has been done > with the consumer tests in PlaintextConsumerTest should be a good > improvement. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
Owen-CH-Leung commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1548061572 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -391,4 +420,15 @@ private String[] addBootstrapServer(String... args) { return newArgs.toArray(new String[0]); } + +private void retryUntilEqual(List expected, Supplier> outputSupplier) { +try { +TestUtils.waitForCondition( +() -> expected.equals(outputSupplier.get()), +"TopicOffsets did not match. Expected: " + expectedTestTopicOffsets() + ", but was: " + outputSupplier.get() Review Comment: So sth like below ? Print out the final offset as error msg for debugging ? ``` TestUtils.waitForCondition( () -> expected.equals(outputSupplier.get()), "TopicOffsets did not match. Expected: " + expectedTestTopicOffsets() + ", but was: " + outputSupplier.get() + ". Final offsets: " + getFinalOffsets() ); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
chia7712 commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1547986293 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -391,4 +420,15 @@ private String[] addBootstrapServer(String... args) { return newArgs.toArray(new String[0]); } + +private void retryUntilEqual(List expected, Supplier> outputSupplier) { +try { +TestUtils.waitForCondition( +() -> expected.equals(outputSupplier.get()), +"TopicOffsets did not match. Expected: " + expectedTestTopicOffsets() + ", but was: " + outputSupplier.get() Review Comment: Could you use `admin` to list latest offsets after it fails? for example: ```java private Set parse() throws ExecutionException, InterruptedException { try (Admin admin = cluster.createAdminClient()) { Set topics = admin.listTopics(new ListTopicsOptions().listInternal(true)).listings().get() .stream().map(TopicListing::name).collect(Collectors.toSet()); Map offsetRequest = admin.describeTopics(topics) .allTopicNames().get().entrySet().stream().flatMap(entry -> entry.getValue().partitions() .stream().map(p -> new AbstractMap.SimpleImmutableEntry<>(new TopicPartition(entry.getKey(), p.partition()), OffsetSpec.latest( .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); return admin.listOffsets(offsetRequest).all().get().entrySet().stream() .map(entry -> new Row(entry.getKey().topic(), entry.getKey().partition(), entry.getValue().offset())) .collect(Collectors.toSet()); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16297) Race condition while promoting future replica can lead to partition unavailability.
[ https://issues.apache.org/jira/browse/KAFKA-16297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-16297: Fix Version/s: 3.8.0 > Race condition while promoting future replica can lead to partition > unavailability. > --- > > Key: KAFKA-16297 > URL: https://issues.apache.org/jira/browse/KAFKA-16297 > Project: Kafka > Issue Type: Sub-task > Components: jbod >Affects Versions: 3.7.0 >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > KIP-858 proposed that when a directory failure occurs after changing the > assignment of a replica that's moved between two directories in the same > broker, but before the future replica promotion completes, the broker should > reassign the replica to inform the controller of its correct status. But this > hasn't yet been implemented, and without it this failure may lead to > indefinite partition unavailability. > Example scenario: > # A broker which leads partition P receives a request to alter the replica > from directory A to directory B. > # The broker creates a future replica in directory B and starts a replica > fetcher. > # Once the future replica first catches up, the broker queues a reassignment > to inform the controller of the directory change. > # The next time the replica catches up, the broker briefly blocks appends > and promotes the replica. However, before the promotion is attempted, > directory A fails. > # The controller was informed that P in now in directory B before it > received the notification that directory A has failed, so it does not elect a > new leader, and as long as the broker is online, partition A remains > unavailable. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16234) Log directory failure re-creates partitions in another logdir automatically
[ https://issues.apache.org/jira/browse/KAFKA-16234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-16234: Fix Version/s: 3.8.0 > Log directory failure re-creates partitions in another logdir automatically > --- > > Key: KAFKA-16234 > URL: https://issues.apache.org/jira/browse/KAFKA-16234 > Project: Kafka > Issue Type: Sub-task > Components: jbod >Affects Versions: 3.7.0 >Reporter: Gaurav Narula >Assignee: Omnia Ibrahim >Priority: Critical > Fix For: 3.8.0, 3.7.1 > > > With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes > in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. > Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old > and new topicIds to decide if it needs to create a new log. > The getter for {{Partition::topicId}} relies on retrieving the topicId from > {{log}} field or {{{}logManager.currentLogs{}}}. The former is set to > {{None}} when a partition is marked offline and the key for the partition is > removed from the latter by {{{}LogManager::handleLogDirFailure{}}}. > Therefore, topicId for a partitioned marked offline always returns {{None}} > and new logs for all partitions in a failed log directory are always created > on another disk. > The broker will fail to restart after the failed disk is repaired because > same partitions will occur in two different directories. The error does > however inform the operator to remove the partitions from the disk that > failed which should help with broker startup. > We can avoid this with KAFKA-16212 but in the short-term, an immediate > solution can be to have {{Partition}} object accept {{Option[TopicId]}} in > it's constructor and have it fallback to {{log}} or {{logManager}} if it's > unset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16082) Broker recreates reassigned partition after logdir failure
[ https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-16082: Fix Version/s: 3.8.0 > Broker recreates reassigned partition after logdir failure > -- > > Key: KAFKA-16082 > URL: https://issues.apache.org/jira/browse/KAFKA-16082 > Project: Kafka > Issue Type: Sub-task > Components: jbod >Affects Versions: 3.7.0 >Reporter: Proven Provenzano >Assignee: Gaurav Narula >Priority: Critical > Fix For: 3.8.0, 3.7.1 > > > There is a possible dataloss scenario > when using JBOD, > when moving the partition leader log from one directory to another on the > same broker, > when after the destination log has caught up to the source log and after the > broker has sent an update to the partition assignment > if the broker accepts and commits a new record for the partition and then the > broker restarts and the original partition leader log is lost > then the destination log would not contain the new record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2032154288 Hey @Phuc-Hong-Tran , any update on this one? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16039: RecordHeaders supports the addAll method [kafka]
funky-eyes commented on PR #15034: URL: https://github.com/apache/kafka/pull/15034#issuecomment-2032110132 > @funky-eyes let me know if you got a chance to review my comment Hello, thank you for your reminder. I submitted this PR to make the functions of recordheader and list consistent, but I do not have permission to submit a kip. I don’t know how to work on this PR. If you know, please tell I thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
chia7712 commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1547934887 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -391,4 +447,15 @@ private String[] addBootstrapServer(String... args) { return newArgs.toArray(new String[0]); } + +private void retryUntilEqual(List expected, List output) { Review Comment: > The build still contains falied test even when retry is implemented... looks like it's due to race condition when executing the tests ? all failed tests are running in kraft mode, so I agree up to a point... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
mjsax commented on PR #15189: URL: https://github.com/apache/kafka/pull/15189#issuecomment-2032098989 Just merged https://github.com/apache/kafka/pull/15510 -- can we move forward with this PR (maybe by rebasing it to see if any tests break?)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJoinBreak-flags out of the loop [kafka]
mjsax commented on PR #15510: URL: https://github.com/apache/kafka/pull/15510#issuecomment-2032092184 Thanks for the fix @VictorvandenHoven! Merged to `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJoinBreak-flags out of the loop [kafka]
mjsax merged PR #15510: URL: https://github.com/apache/kafka/pull/15510 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16316: Configure reprocessing with addGlobalStateStore [kafka]
mjsax commented on code in PR #15619: URL: https://github.com/apache/kafka/pull/15619#discussion_r1543277915 ## streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java: ## @@ -596,7 +597,9 @@ public synchronized StreamsBuilder addGlobalStore(final StoreBuilder s * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} * @return itself * @throws TopologyException if the processor of state is already registered + * @deprecated Since 3.7.0; use {@link #addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier, boolean)} or {@link #addGlobalStore(StoreBuilder, String, String)} instead. Review Comment: ```suggestion * @deprecated Since 3.8.0; use {@link #addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier, boolean)} or {@link #addGlobalStore(StoreBuilder, String, String)} instead. ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java: ## @@ -613,6 +616,74 @@ public synchronized StreamsBuilder addGlobalStore(final StoreBuilder< return this; } +/** + * Adds a global {@link StateStore} to the topology. + * The {@link StateStore} sources its data from all partitions of the provided input topic. + * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance. + * + * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions + * of the input topic. + * + * The provided {@link ProcessorSupplier} will be used to create an + * {@link Processor} that will receive all records forwarded from the {@link SourceNode}. + * The supplier should always generate a new instance. Creating a single {@link Processor} object + * and returning the same object reference in {@link ProcessorSupplier#get()} is a + * violation of the supplier pattern and leads to runtime exceptions. + * This {@link Processor} should be used to keep the {@link StateStore} up-to-date. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. + * + * It is not required to connect a global store to the {@link Processor Processors}, + * {@link Transformer Transformers}, or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default. + * + * @param storeBuilder user defined {@link StoreBuilder}; can't be {@code null} + * @param topic the topic to source the data from + * @param consumed the instance of {@link Consumed} used to define optional parameters; can't be {@code null} + * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} + * @param reprocessOnRestorerestore by reprocessing the data using a processor supplied by stateUpdateSupplier or loads the data in byte for byte + * @return itself + * @throws TopologyException if the processor of state is already registered + */ +public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder, + final String topic, + final Consumed consumed, + final ProcessorSupplier stateUpdateSupplier, + final boolean reprocessOnRestore) { +Objects.requireNonNull(storeBuilder, "storeBuilder can't be null"); +Objects.requireNonNull(consumed, "consumed can't be null"); +internalStreamsBuilder.addGlobalStore( +new StoreBuilderWrapper(storeBuilder), +topic, +new ConsumedInternal<>(consumed), +stateUpdateSupplier, +reprocessOnRestore +); +return this; +} + +/** + * Adds a global {@link StateStore} to the topology. + * The {@link StateStore} sources its data from all partitions of the provided input topic. + * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance. + * + * @param storeBuilder user defined {@link StoreBuilder}; can't be {@code null} + * @param topic the topic to source the data from + * @return itself + * @throws TopologyException if the processor of state is already registered + */ +public synchronizedStreamsBuilder addGlobalStore(final StoreBuilder storeBuilder, + final String topic, + final Consumed consumed) { Review Comment: nit: fix indention ## streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java: ## @@ -613,6 +616,74 @@ public synchronized StreamsBuilder addGlobalS
[jira] [Updated] (KAFKA-16458) Add contains method in KeyValue store interface
[ https://issues.apache.org/jira/browse/KAFKA-16458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16458: Labels: needs-kip (was: ) > Add contains method in KeyValue store interface > --- > > Key: KAFKA-16458 > URL: https://issues.apache.org/jira/browse/KAFKA-16458 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Ayoub Omari >Priority: Minor > Labels: needs-kip > > In some stream processors, we sometimes just want to check if a key exists in > the state store or not. > > I find calling .get() and checking if the return value is null a little bit > verbose > {code:java} > if (store.get(key) != null) { > }{code} > > But I am not sure if it is on purpose that we would like to keep the store > interface simple. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16458) Add contains method in KeyValue store interface
Ayoub Omari created KAFKA-16458: --- Summary: Add contains method in KeyValue store interface Key: KAFKA-16458 URL: https://issues.apache.org/jira/browse/KAFKA-16458 Project: Kafka Issue Type: Wish Components: streams Reporter: Ayoub Omari In some stream processors, we sometimes just want to check if a key exists in the state store or not. I find calling .get() and checking if the return value is null a little bit verbose {code:java} if (store.get(key) != null) { }{code} But I am not sure if it is on purpose that we would like to keep the store interface simple. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
Owen-CH-Leung commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1547789497 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -391,4 +447,15 @@ private String[] addBootstrapServer(String... args) { return newArgs.toArray(new String[0]); } + +private void retryUntilEqual(List expected, List output) { Review Comment: @chia7712 The build still contains falied test even when retry is implemented... looks like it's due to race condition when executing the tests ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16068: Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors [kafka]
Joker-5 commented on code in PR #15642: URL: https://github.com/apache/kafka/pull/15642#discussion_r1547632847 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java: ## @@ -343,7 +346,7 @@ public void testConnectorHasConverterWithNoSuitableConstructor() throws Interrup @Test public void testConnectorHasConverterThatThrowsExceptionOnInstantiation() throws InterruptedException { Map config = defaultSinkConnectorProps(); -config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithConstructorThatThrowsException.class.getName()); +config.put(KEY_CONVERTER_CLASS_CONFIG, TestPlugins.TestPlugin.ALWAYS_THROW_EXCEPTION.className()); Review Comment: At the beginning I used `BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONNECTOR` in the case, but I thought `ALWAYS_THROW_EXCEPTION` has stronger sematics, that's why I changed my mind. But now I think about the case again, I think `BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONNECTOR ` has fine-grain sematics which is more suitable for the case. I've changed it. Thanks so much for pointing out that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16068: Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors [kafka]
Joker-5 commented on code in PR #15642: URL: https://github.com/apache/kafka/pull/15642#discussion_r1547690392 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java: ## @@ -69,6 +69,9 @@ public static void setup() { Map workerProps = new HashMap<>(); workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID); +// Work around a circular-dependency in TestPlugins. +TestPlugins.pluginPath(); Review Comment: If we don't add this, methods which use `TestPlugins` will have erroneous behavior. In particular: ``` [2024-04-02 19:16:25,977] ERROR Could not set up plugin test jars (org.apache.kafka.connect.runtime.isolation.TestPlugins:258) java.lang.NullPointerException at org.apache.kafka.connect.runtime.isolation.TestPlugins$TestPlugin.values(TestPlugins.java:69) at org.apache.kafka.connect.runtime.isolation.TestPlugins.(TestPlugins.java:251) at org.apache.kafka.connect.runtime.isolation.TestPlugins$TestPlugin.(TestPlugins.java:128) at org.apache.kafka.connect.integration.ConnectorValidationIntegrationTest.testConnectorHasConverterWithNoSuitableConstructor(ConnectorValidationIntegrationTest.java:337) ... ``` I found the reason behind it: >TL;DR The whole error occurred because the circle: inner class(some field depend on outer class in the \ method) -> outer class(some line in static code block depend on inner class in the \ method) -> inner class 1. There's a `private final Predicate removeRuntimeClasses` field in the inner class `org.apache.kafka.connect.runtime.isolation.TestPlugins.TestPlugin`. 2. Some enums such as `BAD_PACKAGING_MISSING_SUPERCLASS` in (1)'s field will depend on `private static final Predicate REMOVE_CLASS_FILTER = s -> s.contains("NonExistentInterface")` in the outer class(`org.apache.kafka.connect.runtime.isolation.TestPlugins`). 3. When test mothod such as `testConnectorHasConverterWithNoSuitableConstructor` is running, JVM will call \ method to initialize the inner class(`org.apache.kafka.connect.runtime.isolation.TestPlugins.TestPlugin`). 4. But some enum use the field(`removeRuntimeClasses`) in the outer class(`private static final Predicate REMOVE_CLASS_FILTER = s -> s.contains("NonExistentInterface")`), so at that time JVM has to call \ method to initialize the outer class(`org.apache.kafka.connect.runtime.isolation.TestPlugins`). 5. In the outer class, there's a static code block which used the inner class. But the inner class have not been initialized, the circular-dependency just happened, which result in that error log. ```java static { Throwable err = null; Map pluginJars = new HashMap<>(); try { for (TestPlugin testPlugin : TestPlugin.values()) { // see this line if (pluginJars.containsKey(testPlugin.resourceDir())) { log.debug("Skipping recompilation of " + testPlugin.resourceDir()); } pluginJars.put(testPlugin.resourceDir(), createPluginJar(testPlugin.resourceDir(), testPlugin.removeRuntimeClasses())); } } catch (Throwable e) { log.error("Could not set up plugin test jars", e); err = e; } PLUGIN_JARS = Collections.unmodifiableMap(pluginJars); INITIALIZATION_EXCEPTION = err; } ``` So that's why I added the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16068: Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors [kafka]
Joker-5 commented on code in PR #15642: URL: https://github.com/apache/kafka/pull/15642#discussion_r1547690392 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java: ## @@ -69,6 +69,9 @@ public static void setup() { Map workerProps = new HashMap<>(); workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID); +// Work around a circular-dependency in TestPlugins. +TestPlugins.pluginPath(); Review Comment: If we don't add this, methods which use `TestPlugins` will have erroneous behavior. In particular: ``` [2024-04-02 19:16:25,977] ERROR Could not set up plugin test jars (org.apache.kafka.connect.runtime.isolation.TestPlugins:258) java.lang.NullPointerException at org.apache.kafka.connect.runtime.isolation.TestPlugins$TestPlugin.values(TestPlugins.java:69) at org.apache.kafka.connect.runtime.isolation.TestPlugins.(TestPlugins.java:251) at org.apache.kafka.connect.runtime.isolation.TestPlugins$TestPlugin.(TestPlugins.java:128) at org.apache.kafka.connect.integration.ConnectorValidationIntegrationTest.testConnectorHasConverterWithNoSuitableConstructor(ConnectorValidationIntegrationTest.java:337) ... ``` I have found the reason behind it: >TL;DR The whole error occurred because the circle: inner class(some field depend on outer class in the \ method) -> outer class(some line in static code block depend on inner class in the \ method) -> inner class 1. There's a `private final Predicate removeRuntimeClasses` field in the inner class `org.apache.kafka.connect.runtime.isolation.TestPlugins.TestPlugin`. 2. Some enums such as `BAD_PACKAGING_MISSING_SUPERCLASS` in (1)'s field will depend on `private static final Predicate REMOVE_CLASS_FILTER = s -> s.contains("NonExistentInterface")` in the outer class(`org.apache.kafka.connect.runtime.isolation.TestPlugins`). 3. When test mothod such as `testConnectorHasConverterWithNoSuitableConstructor` is running, JVM will call \ method to initialize the inner class(`org.apache.kafka.connect.runtime.isolation.TestPlugins.TestPlugin`). 4. But some enum use the field(`removeRuntimeClasses`) in the outer class(`private static final Predicate REMOVE_CLASS_FILTER = s -> s.contains("NonExistentInterface")`), so at that time JVM has to call \ method to initialize the outer class(`org.apache.kafka.connect.runtime.isolation.TestPlugins`). 5. In the outer class, there's a static code block which used the inner class. But the inner class have not been initialized, the circular-dependency just happened, which result in that error log. ```java static { Throwable err = null; Map pluginJars = new HashMap<>(); try { for (TestPlugin testPlugin : TestPlugin.values()) { // see this line if (pluginJars.containsKey(testPlugin.resourceDir())) { log.debug("Skipping recompilation of " + testPlugin.resourceDir()); } pluginJars.put(testPlugin.resourceDir(), createPluginJar(testPlugin.resourceDir(), testPlugin.removeRuntimeClasses())); } } catch (Throwable e) { log.error("Could not set up plugin test jars", e); err = e; } PLUGIN_JARS = Collections.unmodifiableMap(pluginJars); INITIALIZATION_EXCEPTION = err; } ``` So that's why I added the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16457 Useless import class [kafka]
rykovsi opened a new pull request, #15646: URL: https://github.com/apache/kafka/pull/15646 [KAFKA-16457](https://issues.apache.org/jira/browse/KAFKA-16457) Useless import class in SslConfigs.java import org.apache.kafka.common.config.ConfigDef.Type; ... .define(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_KEY_DOC) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16457) Useless import class
Svyatoslav created KAFKA-16457: -- Summary: Useless import class Key: KAFKA-16457 URL: https://issues.apache.org/jira/browse/KAFKA-16457 Project: Kafka Issue Type: Task Components: config Affects Versions: 3.7.0 Reporter: Svyatoslav Useless import class in SslConfigs.java {code:java} import org.apache.kafka.common.config.ConfigDef.Type; .define(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_KEY_DOC){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16457) Useless import class
[ https://issues.apache.org/jira/browse/KAFKA-16457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Svyatoslav updated KAFKA-16457: --- Description: Useless import class in SslConfigs.java {code:java} import org.apache.kafka.common.config.ConfigDef.Type; ... .define(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_KEY_DOC){code} was: Useless import class in SslConfigs.java {code:java} import org.apache.kafka.common.config.ConfigDef.Type; .define(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_KEY_DOC){code} > Useless import class > > > Key: KAFKA-16457 > URL: https://issues.apache.org/jira/browse/KAFKA-16457 > Project: Kafka > Issue Type: Task > Components: config >Affects Versions: 3.7.0 >Reporter: Svyatoslav >Priority: Trivial > > Useless import class in SslConfigs.java > > {code:java} > import org.apache.kafka.common.config.ConfigDef.Type; > ... > .define(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, Type.PASSWORD, null, > ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_KEY_DOC){code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16068: Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors [kafka]
Joker-5 commented on code in PR #15642: URL: https://github.com/apache/kafka/pull/15642#discussion_r1547632847 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java: ## @@ -343,7 +346,7 @@ public void testConnectorHasConverterWithNoSuitableConstructor() throws Interrup @Test public void testConnectorHasConverterThatThrowsExceptionOnInstantiation() throws InterruptedException { Map config = defaultSinkConnectorProps(); -config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithConstructorThatThrowsException.class.getName()); +config.put(KEY_CONVERTER_CLASS_CONFIG, TestPlugins.TestPlugin.ALWAYS_THROW_EXCEPTION.className()); Review Comment: At the beginning I used `BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONNECTOR` in the case, but I thought `ALWAYS_THROW_EXCEPTION` has stronger sematics, that's why I changed my mind. But now I think about the case again, I think `BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONNECTOR ` has fine-grain sematics which is more suitable for the case. So I will change it. Thanks so much for pointing out that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16068: Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors [kafka]
Joker-5 commented on code in PR #15642: URL: https://github.com/apache/kafka/pull/15642#discussion_r1547609546 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java: ## @@ -331,7 +334,7 @@ public void testConnectorHasAbstractConverter() throws InterruptedException { @Test public void testConnectorHasConverterWithNoSuitableConstructor() throws InterruptedException { Map config = defaultSinkConnectorProps(); -config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithPrivateConstructor.class.getName()); +config.put(KEY_CONVERTER_CLASS_CONFIG, TestPlugins.TestPlugin.ALWAYS_THROW_EXCEPTION.className()); Review Comment: oops, I misread the code, now I've corrected it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]
nizhikov commented on PR #15645: URL: https://github.com/apache/kafka/pull/15645#issuecomment-2031645630 Hello @chia7712 This is first PR that moves parts of ConfigCommand to java. It contains rewritten `ConfigCommandIntegrationTest`. Please, take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]
nizhikov opened a new pull request, #15645: URL: https://github.com/apache/kafka/pull/15645 This is first part of #15417 refactoring. PR intention - split big PR in parts to simplify review. PR contains `ConfigCommandIntegrationTest` rewritten in java ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16148) Implement GroupMetadataManager#onUnloaded
[ https://issues.apache.org/jira/browse/KAFKA-16148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-16148. - Fix Version/s: 3.8.0 Resolution: Fixed > Implement GroupMetadataManager#onUnloaded > - > > Key: KAFKA-16148 > URL: https://issues.apache.org/jira/browse/KAFKA-16148 > Project: Kafka > Issue Type: Sub-task >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Major > Fix For: 3.8.0 > > > complete all awaiting futures with NOT_COORDINATOR (for classic group) > transition all groups to DEAD. > Cancel all timers related to the unloaded group metadata manager -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16148: Implement GroupMetadataManager#onUnloaded [kafka]
dajac merged PR #15446: URL: https://github.com/apache/kafka/pull/15446 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16039: RecordHeaders supports the addAll method [kafka]
vamossagar12 commented on PR #15034: URL: https://github.com/apache/kafka/pull/15034#issuecomment-2031571928 @funky-eyes let me know if you got a chance to review my comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] KAFKA-16383: fix flaky IdentityReplicationIntegrationTest .testReplicateFromLatest [kafka]
vamossagar12 commented on PR #15556: URL: https://github.com/apache/kafka/pull/15556#issuecomment-2031549949 @johnnychhsu ,for completeness sake, the issue with private constructor related noise in the logs is being fixed in this PR: https://github.com/apache/kafka/pull/15642 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16068: Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors [kafka]
vamossagar12 commented on code in PR #15642: URL: https://github.com/apache/kafka/pull/15642#discussion_r1547506743 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java: ## @@ -343,7 +346,7 @@ public void testConnectorHasConverterWithNoSuitableConstructor() throws Interrup @Test public void testConnectorHasConverterThatThrowsExceptionOnInstantiation() throws InterruptedException { Map config = defaultSinkConnectorProps(); -config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithConstructorThatThrowsException.class.getName()); +config.put(KEY_CONVERTER_CLASS_CONFIG, TestPlugins.TestPlugin.ALWAYS_THROW_EXCEPTION.className()); Review Comment: We could probably use `BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONNECTOR` and in `testConnectorHasHeaderConverterThatThrowsExceptionOnInstantiation` as well? ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java: ## @@ -69,6 +69,9 @@ public static void setup() { Map workerProps = new HashMap<>(); workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID); +// Work around a circular-dependency in TestPlugins. +TestPlugins.pluginPath(); Review Comment: Why do we need this? ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java: ## @@ -331,7 +334,7 @@ public void testConnectorHasAbstractConverter() throws InterruptedException { @Test public void testConnectorHasConverterWithNoSuitableConstructor() throws InterruptedException { Map config = defaultSinkConnectorProps(); -config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithPrivateConstructor.class.getName()); +config.put(KEY_CONVERTER_CLASS_CONFIG, TestPlugins.TestPlugin.ALWAYS_THROW_EXCEPTION.className()); Review Comment: I think we should use `BAD_PACKAGING_DEFAULT_CONSTRUCTOR_PRIVATE_CONNECTOR` in this case as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16456) Can't stop kafka debug logs
[ https://issues.apache.org/jira/browse/KAFKA-16456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833062#comment-17833062 ] Chia-Ping Tsai commented on KAFKA-16456: the debug message "Received {} response from node {} for request with header {}: {}" is common as it is talking about the "response" from server. You can set `log4j.logger.org.apache.kafka.clients.NetworkClient=INFO` to avoid the verbose debug-level message from specific package. > Can't stop kafka debug logs > --- > > Key: KAFKA-16456 > URL: https://issues.apache.org/jira/browse/KAFKA-16456 > Project: Kafka > Issue Type: Bug > Components: logging >Affects Versions: 3.6.0 >Reporter: Rajan Choudhary >Priority: Major > > I am getting kafka debug logs, which are flooding our logs. Sample below > > {code:java} > 09:50:38.073 [kafka-producer-network-thread | maximo-mp] DEBUG > org.apache.kafka.clients.NetworkClient - [Producer clientId=maximo-mp, > transactionalId=sqout-3664816744674374805414] Received API_VERSIONS response > from node 5 for request with header RequestHeader(apiKey=API_VERSIONS, > apiVersion=3, clientId=maximo-mp, correlationId=8): > ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, > minVersion=0, maxVersion=9), ApiVersion(apiKey=1, minVersion=0, > maxVersion=13), ApiVersion(apiKey=2, minVersion=0, maxVersion=7), > ApiVersion(apiKey=3, minVersion=0, maxVersion=12), ApiVersion(apiKey=4, > minVersion=0, maxVersion=5), ApiVersion(apiKey=5, minVersion=0, > maxVersion=3), ApiVersion(apiKey=6, minVersion=0, maxVersion=7), > ApiVersion(apiKey=7, minVersion=0, maxVersion=3), ApiVersion(apiKey=8, > minVersion=0, maxVersion=8), ApiVersion(apiKey=9, minVersion=0, > maxVersion=8), ApiVersion(apiKey=10, minVersion=0, maxVersion=4), > ApiVersion(apiKey=11, minVersion=0, maxVersion=7), ApiVersion(apiKey=12, > minVersion=0, m... > 09:50:38.073 [kafka-producer-network-thread | maximo-mp] DEBUG > org.apache.kafka.clients.NetworkClient - [Producer clientId=maximo-mp, > transactionalId=sqout-3664816744674374805414] Node 5 has finalized features > epoch: 1, finalized features: [], supported features: [], API versions: > (Produce(0): 0 to 9 [usable: 9], Fetch(1): 0 to 13 [usable: 12], > ListOffsets(2): 0 to 7 [usable: 6], Metadata(3): 0 to 12 [usable: 11], > LeaderAndIsr(4): 0 to 5 [usable: 5], StopReplica(5): 0 to 3 [usable: 3], > UpdateMetadata(6): 0 to 7 [usable: 7], ControlledShutdown(7): 0 to 3 [usable: > 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 7], > FindCoordinator(10): 0 to 4 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], > Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], > SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], > ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], > ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], > Del... > 09:50:38.074 [kafka-producer-network-thread | maximo-mp] DEBUG > org.apache.kafka.clients.producer.internals.TransactionManager - [Producer > clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] ProducerId > of partition sqout-0 set to 43458621 with epoch 0. Reinitialize sequence at > beginning. > 09:50:38.074 [kafka-producer-network-thread | maximo-mp] DEBUG > org.apache.kafka.clients.producer.internals.RecordAccumulator - [Producer > clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] Assigned > producerId 43458621 and producerEpoch 0 to batch with base sequence 0 being > sent to partition sqout-0 > 09:50:38.075 [kafka-producer-network-thread | maximo-mp] DEBUG > org.apache.kafka.clients.NetworkClient - [Producer clientId=maximo-mp, > transactionalId=sqout-3664816744674374805414] Sending PRODUCE request with > header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=maximo-mp, > correlationId=9) and timeout 3 to node 5: > {acks=-1,timeout=3,partitionSizes=[sqout-0=4181]} > 09:50:38.095 [kafka-producer-network-thread | maximo-mp] DEBUG > org.apache.kafka.clients.NetworkClient - [Producer clientId=maximo-mp, > transactionalId=sqout-3664816744674374805414] Received PRODUCE response from > node 5 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, > clientId=maximo-mp, correlationId=9): > ProduceResponseData(responses=[TopicProduceResponse(name='sqout', > partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, > baseOffset=796494, logAppendTimeMs=-1, logStartOffset=768203, > recordErrors=[], errorMessage=null)])], throttleTimeMs=0) > 09:50:38.095 [kafka-producer-network-thread | maximo-mp] DEBUG > org.apache.kafka.clients.producer.internals.TransactionManager - [Producer > clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] Produce
[PR] Add s390x support to CI [kafka]
Prashant-Jagtap opened a new pull request, #15644: URL: https://github.com/apache/kafka/pull/15644 We would like to add support for s390x to existing CI. Adding s390x build stage which uses Jdk21 and Scala 2.13. Verified the build and tests locally. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16456) Can't stop kafka debug logs
Rajan Choudhary created KAFKA-16456: --- Summary: Can't stop kafka debug logs Key: KAFKA-16456 URL: https://issues.apache.org/jira/browse/KAFKA-16456 Project: Kafka Issue Type: Bug Components: logging Affects Versions: 3.6.0 Reporter: Rajan Choudhary I am getting kafka debug logs, which are flooding our logs. Sample below {code:java} 09:50:38.073 [kafka-producer-network-thread | maximo-mp] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] Received API_VERSIONS response from node 5 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=maximo-mp, correlationId=8): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=9), ApiVersion(apiKey=1, minVersion=0, maxVersion=13), ApiVersion(apiKey=2, minVersion=0, maxVersion=7), ApiVersion(apiKey=3, minVersion=0, maxVersion=12), ApiVersion(apiKey=4, minVersion=0, maxVersion=5), ApiVersion(apiKey=5, minVersion=0, maxVersion=3), ApiVersion(apiKey=6, minVersion=0, maxVersion=7), ApiVersion(apiKey=7, minVersion=0, maxVersion=3), ApiVersion(apiKey=8, minVersion=0, maxVersion=8), ApiVersion(apiKey=9, minVersion=0, maxVersion=8), ApiVersion(apiKey=10, minVersion=0, maxVersion=4), ApiVersion(apiKey=11, minVersion=0, maxVersion=7), ApiVersion(apiKey=12, minVersion=0, m... 09:50:38.073 [kafka-producer-network-thread | maximo-mp] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] Node 5 has finalized features epoch: 1, finalized features: [], supported features: [], API versions: (Produce(0): 0 to 9 [usable: 9], Fetch(1): 0 to 13 [usable: 12], ListOffsets(2): 0 to 7 [usable: 6], Metadata(3): 0 to 12 [usable: 11], LeaderAndIsr(4): 0 to 5 [usable: 5], StopReplica(5): 0 to 3 [usable: 3], UpdateMetadata(6): 0 to 7 [usable: 7], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 7], FindCoordinator(10): 0 to 4 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], Del... 09:50:38.074 [kafka-producer-network-thread | maximo-mp] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] ProducerId of partition sqout-0 set to 43458621 with epoch 0. Reinitialize sequence at beginning. 09:50:38.074 [kafka-producer-network-thread | maximo-mp] DEBUG org.apache.kafka.clients.producer.internals.RecordAccumulator - [Producer clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] Assigned producerId 43458621 and producerEpoch 0 to batch with base sequence 0 being sent to partition sqout-0 09:50:38.075 [kafka-producer-network-thread | maximo-mp] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=maximo-mp, correlationId=9) and timeout 3 to node 5: {acks=-1,timeout=3,partitionSizes=[sqout-0=4181]} 09:50:38.095 [kafka-producer-network-thread | maximo-mp] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] Received PRODUCE response from node 5 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=maximo-mp, correlationId=9): ProduceResponseData(responses=[TopicProduceResponse(name='sqout', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=796494, logAppendTimeMs=-1, logStartOffset=768203, recordErrors=[], errorMessage=null)])], throttleTimeMs=0) 09:50:38.095 [kafka-producer-network-thread | maximo-mp] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] ProducerId: 43458621; Set last ack'd sequence number for topic-partition sqout-0 to 0 09:50:38.147 [pool-7-thread-19] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION 09:50:38.147 [pool-7-thread-19] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] Enqueuing transactional request EndTxnRequestData(transactionalId='sqout-3664816744674374805414', producerId=43458621, produ
[PR] MINOR: fix typo [kafka]
appchemist opened a new pull request, #15643: URL: https://github.com/apache/kafka/pull/15643 fix typo amd -> and ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
Owen-CH-Leung commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1547302390 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -391,4 +447,15 @@ private String[] addBootstrapServer(String... args) { return newArgs.toArray(new String[0]); } + +private void retryUntilEqual(List expected, List output) { Review Comment: Ahh yes. I've revised to accept a function now. The output should be refreshed during the wait loop. Let's see how the build goes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
Owen-CH-Leung commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1547303323 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -94,15 +101,47 @@ private void setUp() { } } +private void createConsumerAndPoll() { +Properties props = new Properties(); +props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); +props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group"); +props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + +try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { +List topics = new ArrayList<>(); +for (int i = 0; i < topicCount + 1; i++) { +topics.add(getTopicName(i)); +} +consumer.subscribe(topics); +consumer.poll(consumerTimeout); +TestUtils.waitForCondition( Review Comment: Yess. Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
Owen-CH-Leung commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1547302390 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -391,4 +447,15 @@ private String[] addBootstrapServer(String... args) { return newArgs.toArray(new String[0]); } + +private void retryUntilEqual(List expected, List output) { Review Comment: Ahh yes. I've revised to accept a function now. The output should be refreshed during the wait loop -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup Part 2 [kafka]
chia7712 commented on PR #15639: URL: https://github.com/apache/kafka/pull/15639#issuecomment-2031262693 @gharris1727 just curious. How do you notice that incorrect changes? Are there failed tests related to #15597? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16068: Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors [kafka]
Joker-5 opened a new pull request, #15642: URL: https://github.com/apache/kafka/pull/15642 Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org