[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140208#comment-17140208 ] Matthias J. Sax commented on KAFKA-10179: - What you say is fair I guess. Given the current code, if you want to do any of those, you need to disable the optimization. However, for the actual bug this ticket is about, the problem seems to be, that if the optimization is turned on, at some point in the code we pass the changelog topic name into the serde instead of the source topic name. And thus the schema cannot be found and the serde crashes. Thus, this ticket should focus on this bug. Not sure if KAFKA-8037 covers all cases you describe. Maybe you want to follow up on this ticket (so we can extent its scope) or create a new ticket that describes the shortcomings of the current implementation. > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9509) Fix flaky test MirrorConnectorsIntegrationTest.testReplication
[ https://issues.apache.org/jira/browse/KAFKA-9509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140203#comment-17140203 ] Matthias J. Sax commented on KAFKA-9509: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7061/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testReplication/] {quote}ava.lang.AssertionError: Connector MirrorCheckpointConnector tasks did not start in time on cluster: org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster@5376246b at org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:120) at org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationTest.java:191) at org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.setup(MirrorConnectorsIntegrationTest.java:184){quote} > Fix flaky test MirrorConnectorsIntegrationTest.testReplication > -- > > Key: KAFKA-9509 > URL: https://issues.apache.org/jira/browse/KAFKA-9509 > Project: Kafka > Issue Type: Test > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Sanjana Kaundinya >Assignee: Luke Chen >Priority: Major > Fix For: 2.5.0 > > > The test > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication > is a flaky test for MirrorMaker 2.0. Its flakiness lies in the timing of > when the connectors and tasks are started up. The fix for this would make it > such that when the connectors are started up, to wait until the REST endpoint > returns a positive number of tasks to be confident that we can start testing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #8886: KAFKA-9891: fix corrupted StandbyTask state
mjsax commented on pull request #8886: URL: https://github.com/apache/kafka/pull/8886#issuecomment-646427756 Java 8 passed. Java 11: ``` org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9846) Race condition can lead to severe lag underestimate for active tasks
[ https://issues.apache.org/jira/browse/KAFKA-9846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140193#comment-17140193 ] Sophie Blee-Goldman commented on KAFKA-9846: Just realized I never replied [~vinoth]. To answer your question about how to get a task stuck in CREATED during a test, one way would be to start up an instance with two threads and have one of them hang indefinitely. It will drop out of the group and its task will be assigned to the other thread, but since the first thread hasn't released the task state directory lock, this task will be stuck in CREATED. Anyways, just bringing this up since [~vvcephei] is setting up the 2.5.1 release. It seems like we understand the problem and the fix is quite straightforward, can we get this patched for 2.5.1? > Race condition can lead to severe lag underestimate for active tasks > > > Key: KAFKA-9846 > URL: https://issues.apache.org/jira/browse/KAFKA-9846 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Sophie Blee-Goldman >Assignee: Vinoth Chandar >Priority: Critical > Fix For: 2.6.0 > > > In KIP-535 we added the ability to query still-restoring and standby tasks. > To give users control over how out of date the data they fetch can be, we > added an API to KafkaStreams that fetches the end offsets for all changelog > partitions and computes the lag for each local state store. > During this lag computation, we check whether an active task is in RESTORING > and calculate the actual lag if so. If not, we assume it's in RUNNING and > return a lag of zero. However, tasks may be in other states besides running > and restoring; notably they first pass through the CREATED state before > getting to RESTORING. A CREATED task may happen to be caught-up to the end > offset, but in many cases it is likely to be lagging or even completely > uninitialized. > This introduces a race condition where users may be led to believe that a > task has zero lag and is "safe" to query even with the strictest correctness > guarantees, while the task is actually lagging by some unknown amount. > During transfer of ownership of the task between different threads on the > same machine, tasks can actually spend a while in CREATED while the new owner > waits to acquire the task directory lock. So, this race condition may not be > particularly rare in multi-threaded Streams applications -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers
[ https://issues.apache.org/jira/browse/KAFKA-7271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-7271: --- Fix Version/s: (was: 2.6.0) 2.7.0 > Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers > --- > > Key: KAFKA-7271 > URL: https://issues.apache.org/jira/browse/KAFKA-7271 > Project: Kafka > Issue Type: Improvement > Components: streams, system tests >Reporter: John Roesler >Priority: Blocker > Fix For: 2.7.0 > > > Fix in the oldest branch that ignores the test and cherry-pick forward. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close
ableegoldman commented on a change in pull request #8900: URL: https://github.com/apache/kafka/pull/8900#discussion_r442619557 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -69,7 +68,6 @@ private final ChangelogReader changelogReader; private final UUID processId; private final String logPrefix; -private final StreamsMetricsImpl streamsMetrics; Review comment: My IDEA pointed out that this was never used This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close
ableegoldman commented on a change in pull request #8900: URL: https://github.com/apache/kafka/pull/8900#discussion_r442619459 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -522,7 +522,7 @@ private void close(final boolean clean) { if (clean && commitNeeded) { log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to" + " commit and should close as dirty instead"); -throw new StreamsException("Tried to close dirty task as clean"); Review comment: This was a sort-of bug: because we don't close things during `handleRevocation`, we want to make sure the TM will close this as dirty during `handleAssignment`. So we throw this just to force it to call `closeDirty` -- but it wasn't necessarily a fatal exception that caused commit to fail, so we should just throw TaskMigrated here. That said, it doesn't really matter since the ConsumerCoordinator will save and rethrow only the first exception, which is the `handleRevocation` exception. Anything we throw in `handleAssignment` is "lost" -- but we should do the right thing anyway This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
hachikuji merged pull request #8850: URL: https://github.com/apache/kafka/pull/8850 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140166#comment-17140166 ] Rohan Desai commented on KAFKA-10179: - Also, it's not really clear from the documentation that `serialize(deserialize())` is assumed to be the identity function for `ktable(..)`. > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9974) Flaky Test OptimizedKTableIntegrationTest#shouldApplyUpdatesToStandbyStore
[ https://issues.apache.org/jira/browse/KAFKA-9974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140162#comment-17140162 ] Luke Chen commented on KAFKA-9974: -- I don't have thought on fixing the errors. See if anyone wants to investigate it. Thanks. > Flaky Test OptimizedKTableIntegrationTest#shouldApplyUpdatesToStandbyStore > -- > > Key: KAFKA-9974 > URL: https://issues.apache.org/jira/browse/KAFKA-9974 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Priority: Major > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/321/testReport/junit/org.apache.kafka.streams.integration/OptimizedKTableIntegrationTest/shouldApplyUpdatesToStandbyStore/] > {quote}java.lang.AssertionError: Expected: is <0L> but: was <43L> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at > org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore(OptimizedKTableIntegrationTest.java:138){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9974) Flaky Test OptimizedKTableIntegrationTest#shouldApplyUpdatesToStandbyStore
[ https://issues.apache.org/jira/browse/KAFKA-9974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-9974: Assignee: (was: Luke Chen) > Flaky Test OptimizedKTableIntegrationTest#shouldApplyUpdatesToStandbyStore > -- > > Key: KAFKA-9974 > URL: https://issues.apache.org/jira/browse/KAFKA-9974 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Priority: Major > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/321/testReport/junit/org.apache.kafka.streams.integration/OptimizedKTableIntegrationTest/shouldApplyUpdatesToStandbyStore/] > {quote}java.lang.AssertionError: Expected: is <0L> but: was <43L> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at > org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore(OptimizedKTableIntegrationTest.java:138){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140158#comment-17140158 ] Rohan Desai edited comment on KAFKA-10179 at 6/19/20, 3:19 AM: --- Deserialization may itself be a transformation. For example, suppose I have source data with 10 fields, but only care about 3 of them for my stream processing app. It seems that it would be reasonable to provide a deserializer that just extracts those 3 fields. I suppose you could express this as a projection after creating the table, but that does preclude optimizations that use selective deserialization. And it may be much more expensive to do the materialization (since you're potentially materializing lots of data unnecessarily). I think there should be some way to achieve each of the following: * optimized and the data in the store is exactly the same as the topic data . In this case (what's implemented today) the data can be restored by writing the source records into the store * optimized and the deserializer transforms the data somehow. In this case the data can be restored by deserializing/serializing each row from the source topic before writing it into the store. I don't think this is possible today. * not optimized (w/ which you could have a transforming deserializer and faster recovery, at the cost of extra data in kafka). I don't think this is possible today without turning all optimizations off. > This is a known issue and tracked via: >https://issues.apache.org/jira/browse/KAFKA-8037 ack - thanks! was (Author: desai.p.rohan): Deserialization may itself be a transformation. For example, suppose I have source data with 10 fields, but only care about 3 of them for my stream processing app. It seems that it would be reasonable to provide a deserializer that just extracts those 3 fields. I suppose you could express this as a projection after creating the table, but that does preclude optimizations that use selective deserialization. And it may be much more expensive to do the materialization (since you're potentially materializing lots of data unnecessarily). I think there should be some way to achieve each of the following: * optimized and the data in the store is exactly the same as the topic data . In this case (what's implemented today) the data can be restored by writing the source records into the store * optimized and the deserializer transforms the data somehow. In this case the data can be restored by deserializing/serializing each row from the source topic before writing it into the store. I don't think this is possible today. * not optimized (which would you have a transforming deserializer and faster recovery, at the cost of extra data in kafka). I don't think this is possible today without turning all optimizations off. > This is a known issue and tracked via: >https://issues.apache.org/jira/browse/KAFKA-8037 ack - thanks! > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140158#comment-17140158 ] Rohan Desai commented on KAFKA-10179: - Deserialization may itself be a transformation. For example, suppose I have source data with 10 fields, but only care about 3 of them for my stream processing app. It seems that it would be reasonable to provide a deserializer that just extracts those 3 fields. I suppose you could express this as a projection after creating the table, but that does preclude optimizations that use selective deserialization. And it may be much more expensive to do the materialization (since you're potentially materializing lots of data unnecessarily). I think there should be some way to achieve each of the following: * optimized and the data in the store is exactly the same as the topic data . In this case (what's implemented today) the data can be restored by writing the source records into the store * optimized and the deserializer transforms the data somehow. In this case the data can be restored by deserializing/serializing each row from the source topic before writing it into the store. I don't think this is possible today. * not optimized (which would you have a transforming deserializer and faster recovery, at the cost of extra data in kafka). I don't think this is possible today without turning all optimizations off. > This is a known issue and tracked via: >https://issues.apache.org/jira/browse/KAFKA-8037 ack - thanks! > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon edited a comment on pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test
showuon edited a comment on pull request #8894: URL: https://github.com/apache/kafka/pull/8894#issuecomment-646406717 @ryannedolan @skaundinya15 , thanks for your good suggestion. Yes, what I did to add retries is the same as increasing the timeout value. And I found the `waitForCondition` is not good in this case, because the `consume` method in `EmbeddedKafkaCluster.java` will not return the consumeRecords to let us increment the partial consumeRecords if it can't consume the expected records size in time. Instead, it'll throw exception directly. In other words, `comsume` method is already doing a `waitForCondition` job. So, what we need to do is just to **increase the timeout value** to give the MM2 replication more time. It should make this test more stable. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test
showuon commented on pull request #8894: URL: https://github.com/apache/kafka/pull/8894#issuecomment-646406717 @ryannedolan @skaundinya15 , thanks for your good suggestion. Yes, what I did to add retries is the same as increasing the timeout value. And after some investigation, I found the `waitForCondition` is not good in this case because the `consume` method in `EmbeddedKafkaCluster.java` will not return the consumeRecords if the consumeRecords doesn't >= expected records, it'll throw exception directly. In other words, `comsume` method is already doing a `waitForCondition` job. So, what we need to do is just to **increase the timeout value** to give the MM2 replication more time. It should make this test more stable. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140138#comment-17140138 ] harsha commented on KAFKA-7500: --- Hi, Does Mirror maker 2 support Kafka version 0.10.0.0 ? > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Ryanne Dolan >Priority: Major > Labels: pull-request-available, ready-to-commit > Fix For: 2.4.0 > > Attachments: Active-Active XDCR setup.png > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
[ https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140089#comment-17140089 ] Sophie Blee-Goldman commented on KAFKA-10184: - This is all pretty pathetic lol. Good 'ol Jenkins Decreasing the message size might help though. I don't have any real evidence to back up the claim that startup might be occupying a large portion of this time, it just seems like good practice. Who knows what Jenkins is doing – maybe it's taking so long just to create topics? > Flaky > HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores > -- > > Key: KAFKA-10184 > URL: https://issues.apache.org/jira/browse/KAFKA-10184 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Minor > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 12. Input > records haven't all been written to the changelog: 442 > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at >
[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
[ https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140088#comment-17140088 ] John Roesler commented on KAFKA-10184: -- What in the world... How can we not have processed even 500 records in two minutes? I agree waiting for start up first would probably help. Do we have any logs that could confirm the hypothesis that the startup phase is eating up a bunch of our timeout? We should probably decrease the size of the records down from a whopping 1kB. My intent was to bridge the integration and system test worlds by creating “realistic” data here, but maybe that was expecting too much of the CIT infrastructure. > Flaky > HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores > -- > > Key: KAFKA-10184 > URL: https://issues.apache.org/jira/browse/KAFKA-10184 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Minor > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 12. Input > records haven't all been written to the changelog: 442 > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at
[jira] [Resolved] (KAFKA-10113) LogTruncationException sets fetch offsets incorrectly
[ https://issues.apache.org/jira/browse/KAFKA-10113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-10113. - Fix Version/s: 2.6.0 Resolution: Fixed > LogTruncationException sets fetch offsets incorrectly > - > > Key: KAFKA-10113 > URL: https://issues.apache.org/jira/browse/KAFKA-10113 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.6.0 > > > LogTruncationException, which extends OffsetOutOfRangeException, takes the > divergent offsets in the constructor. These are the first offsets known to > diverge from what the consumer read. These are then passed to the > OffsetOutOfRangeException incorrectly as the out of range fetch offsets. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10113) LogTruncationException sets fetch offsets incorrectly
[ https://issues.apache.org/jira/browse/KAFKA-10113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-10113: Affects Version/s: 2.5.0 2.4.1 > LogTruncationException sets fetch offsets incorrectly > - > > Key: KAFKA-10113 > URL: https://issues.apache.org/jira/browse/KAFKA-10113 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0, 2.4.1 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.6.0 > > > LogTruncationException, which extends OffsetOutOfRangeException, takes the > divergent offsets in the constructor. These are the first offsets known to > diverge from what the consumer read. These are then passed to the > OffsetOutOfRangeException incorrectly as the out of range fetch offsets. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException
hachikuji merged pull request #8822: URL: https://github.com/apache/kafka/pull/8822 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8266) Improve `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`
[ https://issues.apache.org/jira/browse/KAFKA-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140082#comment-17140082 ] Matthias J. Sax commented on KAFKA-8266: The other ticket is resolved, but I just saw another failure for this test: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7034/testReport/junit/kafka.api/ConsumerBounceTest/testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup/] {quote}org.scalatest.exceptions.TestFailedException: The remaining consumers in the group could not fetch the expected records at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at org.scalatest.Assertions.fail(Assertions.scala:1091) at org.scalatest.Assertions.fail$(Assertions.scala:1087) at org.scalatest.Assertions$.fail(Assertions.scala:1389) at kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:329){quote} \cc [~dajac] > Improve > `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup` > > > Key: KAFKA-8266 > URL: https://issues.apache.org/jira/browse/KAFKA-8266 > Project: Kafka > Issue Type: Test >Reporter: Jason Gustafson >Priority: Major > > Some additional validation could be done after the member gets kicked out. > The main thing is showing that the group can continue to consume data and > commit offsets. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception
[ https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arun R updated KAFKA-10186: --- Comment: was deleted (was: I would love to take a look if no one else is looking at it.) > Aborting transaction with pending data should throw non-fatal exception > --- > > Key: KAFKA-10186 > URL: https://issues.apache.org/jira/browse/KAFKA-10186 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: needs-kip, newbie, newbie++ > > Currently if you try to abort a transaction with any pending (non-flushed) > data, the send exception is set to > {code:java} > KafkaException("Failing batch since transaction was aborted"){code} > This exception type is generally considered fatal, but this is a valid state > to be in -- the point of throwing the exception is to alert that the records > will not be sent, not that you are in an unrecoverable error state. > We should throw a different (possibly new) type of exception here to > distinguish from fatal and recoverable errors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #8886: KAFKA-9891: fix corrupted StandbyTask state
mjsax commented on pull request #8886: URL: https://github.com/apache/kafka/pull/8886#issuecomment-646376402 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8886: KAFKA-9891: fix corrupted StandbyTask state
mjsax commented on pull request #8886: URL: https://github.com/apache/kafka/pull/8886#issuecomment-646376357 Java 8 passed. Java 11: ``` kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…
mjsax commented on a change in pull request #8887: URL: https://github.com/apache/kafka/pull/8887#discussion_r442574574 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -1048,4 +1042,28 @@ public String toString(final String indent) { Set lockedTaskDirectories() { return Collections.unmodifiableSet(lockedTaskDirectories); } + +public static void executeAndMaybeSwallow(final boolean clean, + final Runnable runnable, + final java.util.function.Consumer actionIfClean, + final java.util.function.Consumer actionIfNotClean) { Review comment: I am wondering if adding this method to `TaskManager` is the best choice (cf https://issues.apache.org/jira/browse/KAFKA-10055). Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…
mjsax commented on a change in pull request #8887: URL: https://github.com/apache/kafka/pull/8887#discussion_r442574113 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -741,29 +741,23 @@ void shutdown(final boolean clean) { for (final Task task : tasks.values()) { if (task.isActive()) { -try { - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()); -} catch (final RuntimeException e) { -if (clean) { -firstException.compareAndSet(null, e); -} else { -log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e); -} -} +executeAndMaybeSwallow( +clean, +() -> activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()), +e -> firstException.compareAndSet(null, e), +e -> log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e) +); } } tasks.clear(); -try { -activeTaskCreator.closeThreadProducerIfNeeded(); -} catch (final RuntimeException e) { -if (clean) { -firstException.compareAndSet(null, e); -} else { -log.warn("Ignoring an exception while closing thread producer.", e); -} -} +executeAndMaybeSwallow( +clean, +() -> activeTaskCreator.closeThreadProducerIfNeeded(), Review comment: This is a lambda. Do you mean method reference? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140075#comment-17140075 ] Matthias J. Sax commented on KAFKA-10179: - {quote}I'm not sure it's correct to use the same "topic" name for materializing optimized source tables, as it's logically different data. In the normal flow (not recovery), we're taking the topic data, validating/transforming it by deserializing it (which might apply some transforms like projecting just fields of interest), and then serializing it, and then writing it into the store. So the "topic" we pass to the serializer should be different since it represents different data from the source topic. For this case, the soure-topic-changelog optimization does no apply, and the store would always have its own changelog topic. And thus, the input-topic schema registered in the SR should not be "touched", and the write to the changelog topic should register a new scheme using the changelog topic name. Thus, no naming issue in SR should happen. {quote} The source-topic-changelog optimization really only applies, if the data in the input topic is exactly the same as in the changelog topic and thus, we avoid creating the changelog topic. To ensure this, we don't allow any processing to happen in between. The data would be deserialized and re-serialized using the same Serde (this is inefficiency we pay, as we also need to send the de-serialized data downstream for further processing). {quote}Another issue that I think exists (need to try to reproduce) that deserializing/serializing would solve is skipped validation. The source topic deserializer functions as a sort of validator for records from the source topic. When the streams app is configured to skip on deserialization errors, bad source records are just skipped. However if we restore by just writing those records to the state store, we now hit the deserialization error when reading the state store, which is a query-killing error. {quote} This is a know issue and tracked via: https://issues.apache.org/jira/browse/KAFKA-8037 > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140075#comment-17140075 ] Matthias J. Sax edited comment on KAFKA-10179 at 6/19/20, 12:42 AM: {quote}I'm not sure it's correct to use the same "topic" name for materializing optimized source tables, as it's logically different data. In the normal flow (not recovery), we're taking the topic data, validating/transforming it by deserializing it (which might apply some transforms like projecting just fields of interest), and then serializing it, and then writing it into the store. So the "topic" we pass to the serializer should be different since it represents different data from the source topic. For this case, the soure-topic-changelog optimization does no apply, and the store would always have its own changelog topic. And thus, the input-topic schema registered in the SR should not be "touched", and the write to the changelog topic should register a new scheme using the changelog topic name. Thus, no naming issue in SR should happen. {quote} The source-topic-changelog optimization really only applies, if the data in the input topic is exactly the same as in the changelog topic and thus, we avoid creating the changelog topic. To ensure this, we don't allow any processing to happen in between. The data would be deserialized and re-serialized using the same Serde (this is inefficiency we pay, as we also need to send the de-serialized data downstream for further processing). {quote}Another issue that I think exists (need to try to reproduce) that deserializing/serializing would solve is skipped validation. The source topic deserializer functions as a sort of validator for records from the source topic. When the streams app is configured to skip on deserialization errors, bad source records are just skipped. However if we restore by just writing those records to the state store, we now hit the deserialization error when reading the state store, which is a query-killing error. {quote} This is a known issue and tracked via: https://issues.apache.org/jira/browse/KAFKA-8037 was (Author: mjsax): {quote}I'm not sure it's correct to use the same "topic" name for materializing optimized source tables, as it's logically different data. In the normal flow (not recovery), we're taking the topic data, validating/transforming it by deserializing it (which might apply some transforms like projecting just fields of interest), and then serializing it, and then writing it into the store. So the "topic" we pass to the serializer should be different since it represents different data from the source topic. For this case, the soure-topic-changelog optimization does no apply, and the store would always have its own changelog topic. And thus, the input-topic schema registered in the SR should not be "touched", and the write to the changelog topic should register a new scheme using the changelog topic name. Thus, no naming issue in SR should happen. {quote} The source-topic-changelog optimization really only applies, if the data in the input topic is exactly the same as in the changelog topic and thus, we avoid creating the changelog topic. To ensure this, we don't allow any processing to happen in between. The data would be deserialized and re-serialized using the same Serde (this is inefficiency we pay, as we also need to send the de-serialized data downstream for further processing). {quote}Another issue that I think exists (need to try to reproduce) that deserializing/serializing would solve is skipped validation. The source topic deserializer functions as a sort of validator for records from the source topic. When the streams app is configured to skip on deserialization errors, bad source records are just skipped. However if we restore by just writing those records to the state store, we now hit the deserialization error when reading the state store, which is a query-killing error. {quote} This is a know issue and tracked via: https://issues.apache.org/jira/browse/KAFKA-8037 > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use
[jira] [Commented] (KAFKA-10114) Kafka producer stuck after broker crash
[ https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140073#comment-17140073 ] Jason Gustafson commented on KAFKA-10114: - [~ibinyami] Hmm, this is still not very clear to me. > I think records were not timed out because the sequence that times them out > happens after you have a producer id. Network thread is stuck on > maybeWaitForProducerId(called from Sender.java:306) while the relevant > failBatch invocation is only called from sendProducerData() which is executed > after maybeWaitForProducerId (called from Sender.java:334) The logic in `awaitNodeReady` is not blocked on batch completion, but request completion. We do not need `failBatch` in order to complete a pending request and free up room for additional requests. If you can reproduce this, it would be very helpful to see TRACE level logging from the producer. > Kafka producer stuck after broker crash > --- > > Key: KAFKA-10114 > URL: https://issues.apache.org/jira/browse/KAFKA-10114 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.3.1, 2.4.1 >Reporter: Itamar Benjamin >Priority: Critical > > Today two of our kafka brokers crashed (cluster of 3 brokers), and producers > were not able to send new messages. After brokers started again all producers > resumed sending data except for a single one. > at the beginning producer rejected all new messages with TimeoutException: > > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > incoming-mutable-RuntimeIIL-1:12 ms has passed since batch creation > {code} > > then after sometime exception changed to > > {code:java} > org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory > within the configured max blocking time 6 ms. > {code} > > > jstack shows kafka-producer-network-thread is waiting to get producer id: > > {code:java} > "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 > cpu=63594017.16ms elapsed=1511219.38s tid=0x7fffd8353000 nid=0x4fa4 > sleeping [0x7ff55c177000] >java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(java.base@11.0.1/Native Method) > at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296) > at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41) > at > org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.lang.Thread.run(java.base@11.0.1/Thread.java:834) Locked > ownable synchronizers: > - None > {code} > > digging into maybeWaitForProducerId(), it waits until some broker is ready > (awaitNodeReady function) which in return calls leastLoadedNode() on > NetworkClient. This one iterates over all brokers and checks if a request can > be sent to it using canSendRequest(). > This is the code for canSendRequest(): > > {code:java} > return connectionStates.isReady(node, now) && selector.isChannelReady(node) > && inFlightRequests.canSendMore(node) > {code} > > > using some debugging tools i saw this expression always evaluates to false > since the last part (canSendMore) is false. > > This is the code for canSendMore: > {code:java} > public boolean canSendMore(String node) { > Deque queue = requests.get(node); return queue > == null || queue.isEmpty() || (queue.peekFirst().send.completed() && > queue.size() < this.maxInFlightRequestsPerConnection); } > {code} > > > i verified > {code:java} > queue.peekFirst().send.completed() > {code} > is true, and that leads to the live lock - since requests queues are full for > all nodes a new request to check broker availability and reconnect to it > cannot be submitted. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
hachikuji commented on pull request #8850: URL: https://github.com/apache/kafka/pull/8850#issuecomment-646369171 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
hachikuji commented on pull request #8850: URL: https://github.com/apache/kafka/pull/8850#issuecomment-646368944 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception
[ https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140069#comment-17140069 ] Arun R commented on KAFKA-10186: I would love to take a look if no one else is looking at it. > Aborting transaction with pending data should throw non-fatal exception > --- > > Key: KAFKA-10186 > URL: https://issues.apache.org/jira/browse/KAFKA-10186 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: needs-kip, newbie, newbie++ > > Currently if you try to abort a transaction with any pending (non-flushed) > data, the send exception is set to > {code:java} > KafkaException("Failing batch since transaction was aborted"){code} > This exception type is generally considered fatal, but this is a valid state > to be in -- the point of throwing the exception is to alert that the records > will not be sent, not that you are in an unrecoverable error state. > We should throw a different (possibly new) type of exception here to > distinguish from fatal and recoverable errors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs
mjsax commented on pull request #8752: URL: https://github.com/apache/kafka/pull/8752#issuecomment-646367241 @sneakyburro Any update on this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8871: MINOR: code cleanup for inconsistent naming
mjsax commented on pull request #8871: URL: https://github.com/apache/kafka/pull/8871#issuecomment-646365697 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException
mumrah commented on pull request #8822: URL: https://github.com/apache/kafka/pull/8822#issuecomment-646361035 Sounds good to me This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vinothchandar commented on pull request #8898: KAFKA-10138: Prefer --bootstrap-server for reassign_partitions command in ducktape tests
vinothchandar commented on pull request #8898: URL: https://github.com/apache/kafka/pull/8898#issuecomment-646360446 @cmccabe Please take a pass when you can.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close
ableegoldman opened a new pull request #8900: URL: https://github.com/apache/kafka/pull/8900 If there's any pending data and we haven't flushed the producer when we abort a transaction, a KafkaException is returned for the previous `send`. This is a bit misleading, since the situation is not an unrecoverable error and so the Kafka Exception is really non-fatal. For now, we should just catch and swallow this in the RecordCollector (see also: [KAFKA-10169](https://issues.apache.org/jira/browse/KAFKA-10186)) The reason we ended up aborting an un-flushed transaction was due to the combination of a. always aborting the ongoing transaction when any task is closed/revoked b. only committing (and flushing) if at least one of the revoked tasks needs to be committed Given the above, we can end up with an ongoing transaction that isn't committed since none of the revoked tasks have any data in the transaction. We then abort the transaction anyway, when those tasks are closed. So in addition to the above (swallowing this exception), we should avoid unnecessarily aborting data for tasks that haven't been revoked. We can handle this by splitting the RecordCollector's `close` into a dirty and clean flavor: if dirty, we need to abort the transaction since it may be dirty due to the commit attempt failing. But if clean, we can skip aborting the transaction since we know that either we just committed and thus there is no ongoing transaction to abort, or else the transaction in flight contains no data from the tasks being closed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed
[ https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-10167. --- Resolution: Fixed > Streams EOS-Beta should not try to get end-offsets as read-committed > > > Key: KAFKA-10167 > URL: https://issues.apache.org/jira/browse/KAFKA-10167 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0 > > > This is a bug discovered with the new EOS protocol (KIP-447), here's the > context: > In Streams when we are assigned with the new active tasks, we would first try > to restore the state from the changelog topic all the way to the log end > offset, and then we can transit from the `restoring` to the `running` state > to start processing the task. > Before KIP-447, the end-offset call is only triggered after we've passed the > synchronization barrier at the txn-coordinator which would guarantee that the > txn-marker has been sent and received (otherwise we would error with > CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker > is received, it also means that the marker has been fully replicated, which > in turn guarantees that the data written before that marker has been fully > replicated. As a result, when we send the list-offset with `read-committed` > flag we are guaranteed that the returned offset == LSO == high-watermark. > After KIP-447 however, we do not fence on the txn-coordinator but on > group-coordinator upon offset-fetch, and the group-coordinator would return > the fetching offset right after it has received the replicated the txn-marker > sent to it. However, since the txn-marker are sent to different brokers in > parallel, and even within the same broker markers of different partitions are > appended / replicated independently as well, so when the fetch-offset request > returns it is NOT guaranteed that the LSO on other data partitions would have > been advanced as well. And hence in that case the `endOffset` call may > returned a smaller offset, causing data loss. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException
hachikuji commented on pull request #8822: URL: https://github.com/apache/kafka/pull/8822#issuecomment-646348733 @mumrah Hmm, I think I like the current approach of discarding the response if we're no longer in the same state in which the fetch state was sent. Mainly because it's simple. Arguably we could do something more refined. For example, a topic authorization error is still going to be relevant even if the partition is being reset. However, since we're talking about rare cases, it doesn't seem too worthwhile to try and optimize; worst case, we'll send the request again and get the same error. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #8899: MINOR: Gate test coverage plugin behind Gradle property
ijuma opened a new pull request #8899: URL: https://github.com/apache/kafka/pull/8899 Most builds don't require test coverage output, so it's wasteful to spend cycles tracking coverage information for each method invoked. I ran a quick test in a fast desktop machine, the absolute difference will be larger in a slower machine. The tests were executed after `./gradlew clean` and with a gradlew daemon that was started just before the test (and mildly warmed up with `./gradlew clean` again). `./gradlew unitTest --continue --profile`: * With coverage enabled: 6m32s * With coverage disabled: 5m47s I ran the same test twice and the results were within 2s of each other, so reasonably consistent. 16% reduction in the time taken to run the unit tests is a reasonable gain with little downside, so I think this is a good change. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException
mumrah commented on pull request #8822: URL: https://github.com/apache/kafka/pull/8822#issuecomment-646338829 @hachikuji yea that's the check I was referring to (where we disregard the fetch response, errors included). Do you think any of the errors we handle besides OOOR are worth handling in the case that we're no longer in the FETCHING state? Like maybe one of the errors that triggers a metadata update? However, that might be adding complexity for little gain. I'm fine with it either way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception
[ https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-10186: Labels: needs-kip newbie newbie++ (was: needs-kip) > Aborting transaction with pending data should throw non-fatal exception > --- > > Key: KAFKA-10186 > URL: https://issues.apache.org/jira/browse/KAFKA-10186 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: needs-kip, newbie, newbie++ > > Currently if you try to abort a transaction with any pending (non-flushed) > data, the send exception is set to > {code:java} > KafkaException("Failing batch since transaction was aborted"){code} > This exception type is generally considered fatal, but this is a valid state > to be in -- the point of throwing the exception is to alert that the records > will not be sent, not that you are in an unrecoverable error state. > We should throw a different (possibly new) type of exception here to > distinguish from fatal and recoverable errors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment
cmccabe commented on a change in pull request #8891: URL: https://github.com/apache/kafka/pull/8891#discussion_r442535630 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -647,8 +647,14 @@ class KafkaController(val config: KafkaConfig, info(s"Skipping reassignment of $tp since the topic is currently being deleted") new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.") } else { -val assignedReplicas = controllerContext.partitionReplicaAssignment(tp) -if (assignedReplicas.nonEmpty) { +val assignment = controllerContext.partitionFullReplicaAssignment(tp) +if (assignment == ReplicaAssignment.empty) { + new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.") +} else if (assignment == reassignment) { Review comment: It seems like the thing to agree on is the final state, right? This comparison is taking into account the current replica set which may change over the course of the reassignment... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10169) KafkaException: Failing batch since transaction was aborted
[ https://issues.apache.org/jira/browse/KAFKA-10169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-10169: --- Assignee: Sophie Blee-Goldman > KafkaException: Failing batch since transaction was aborted > --- > > Key: KAFKA-10169 > URL: https://issues.apache.org/jira/browse/KAFKA-10169 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > We've seen the following exception in our eos-beta test application recently: > {code:java} > [2020-06-13T00:09:14-07:00] > (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic > stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-25-changelog for task > 1_2 due to: [2020-06-13T00:09:14-07:00] > (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted [2020-06-13T00:09:14-07:00] > (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) > Exception handler choose to FAIL the processing, no more records would be > sent. at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:213) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:185) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1347) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) [2020-06-13T00:09:14-07:00] > (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) > Caused by: org.apache.kafka.common.KafkaException: Failing batch since > transaction was aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > ... 3 more > {code} > Somewhat unclear if this is an issue with eos-beta specifically, or just eos > in general. But several threads have died over the course of a few days in > the eos-beta application, while none so far have died on the eos-alpha > application. > It's also unclear (at least to me) whether this is definitely an issue in > Streams or possibly a bug in the producer (or even the broker, although that > seems unlikely) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception
Sophie Blee-Goldman created KAFKA-10186: --- Summary: Aborting transaction with pending data should throw non-fatal exception Key: KAFKA-10186 URL: https://issues.apache.org/jira/browse/KAFKA-10186 Project: Kafka Issue Type: Improvement Components: producer Reporter: Sophie Blee-Goldman Currently if you try to abort a transaction with any pending (non-flushed) data, the send exception is set to {code:java} KafkaException("Failing batch since transaction was aborted"){code} This exception type is generally considered fatal, but this is a valid state to be in -- the point of throwing the exception is to alert that the records will not be sent, not that you are in an unrecoverable error state. We should throw a different (possibly new) type of exception here to distinguish from fatal and recoverable errors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging
vvcephei commented on a change in pull request #8896: URL: https://github.com/apache/kafka/pull/8896#discussion_r442531300 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java ## @@ -223,6 +227,7 @@ public void shouldInitializeChangelogAndCheckForCompletion() { @Test public void shouldPollWithRightTimeout() { EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes(); + EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L)); Review comment: This is moderately obnoxious... The addition of logging these values means that these tests will get a NullPointerException unless we mock this call, but the mock is irrelevant to the test outcome. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment
cmccabe commented on a change in pull request #8891: URL: https://github.com/apache/kafka/pull/8891#discussion_r442533488 ## File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ## @@ -1685,10 +1702,9 @@ object ReassignPartitionsCommand extends Logging { opts.cancelOpt -> collection.immutable.Seq( opts.reassignmentJsonFileOpt ), - opts.listOpt -> collection.immutable.Seq( - ) + opts.listOpt -> collection.immutable.Seq.empty Review comment: seems to break the symmetry a bit, doesn't it? Although, I don't feel strongly about this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment
cmccabe commented on pull request #8891: URL: https://github.com/apache/kafka/pull/8891#issuecomment-646334265 The refactors to the tool generally look good. I thought we always allowed throttles to be set to 0 for some reason, though? Looks like this change removes that ability which we probably don't want... although I doubt many people are using it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging
vvcephei commented on a change in pull request #8896: URL: https://github.com/apache/kafka/pull/8896#discussion_r442529975 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ## @@ -496,8 +539,9 @@ private void bufferChangelogRecords(final ChangelogMetadata changelogMetadata, f } else { changelogMetadata.bufferedRecords.add(record); final long offset = record.offset(); -if (changelogMetadata.restoreEndOffset == null || offset < changelogMetadata.restoreEndOffset) +if (changelogMetadata.restoreEndOffset == null || offset < changelogMetadata.restoreEndOffset) { changelogMetadata.bufferedLimitIndex = changelogMetadata.bufferedRecords.size(); +} Review comment: I've rolled back a bunch of accidental formatting changes, but left the ones that are actually code style compliance issues (like using brackets around conditional bodies). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vinothchandar opened a new pull request #8898: KAFKA-10138: Prefer --bootstrap-server for reassign_partitions command in ducktape tests
vinothchandar opened a new pull request #8898: URL: https://github.com/apache/kafka/pull/8898 http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-06-17--001.1592453352--vinothchandar--KC342-ducktape--e64cc463b/report.html Both ThrottlingTest and ReassignPartitionsTest, which invokes these methods pass locally and twice in CI. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #8897: MINOR; Use the automated protocol for the Consumer Protocol's subscriptions and assignments
dajac opened a new pull request #8897: URL: https://github.com/apache/kafka/pull/8897 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ConcurrencyPractitioner commented on pull request #8881: KIP-557: Add emit on change support to Kafka Streams
ConcurrencyPractitioner commented on pull request #8881: URL: https://github.com/apache/kafka/pull/8881#issuecomment-646304787 @vvcephei Think you have time to look at this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster
vvcephei commented on a change in pull request #8892: URL: https://github.com/apache/kafka/pull/8892#discussion_r442497609 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ## @@ -148,27 +150,35 @@ private final TopicPartition t3p2 = new TopicPartition("topic3", 2); private final TopicPartition t3p3 = new TopicPartition("topic3", 3); -private final List infos = asList( -new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]) -); - -private final SubscriptionInfo defaultSubscriptionInfo = getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS); +private final List partitionInfos = getPartitionInfos(3, 3); +{ +partitionInfos.add(new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])); +} private final Cluster metadata = new Cluster( "cluster", Collections.singletonList(Node.noNode()), -infos, +partitionInfos, emptySet(), -emptySet()); +emptySet() +); + +/* Used by the scale test for large apps/clusters */ +private static final int NUM_TOPICS_XL = 10; +private static final int NUM_PARTITIONS_PER_TOPIC_XL = 1_000; +private static final int NUM_CONSUMERS_XL = 100; +private static final List TOPICS_LIST_XL = new ArrayList<>(); +private static final Map CHANGELOG_END_OFFSETS_XL = new HashMap<>(); +private static final List PARTITION_INFOS_XL = getPartitionInfos(NUM_TOPICS_XL, NUM_PARTITIONS_PER_TOPIC_XL); +private static final Cluster CLUSTER_METADATA_XL = new Cluster( +"cluster", +Collections.singletonList(Node.noNode()), +PARTITION_INFOS_XL, +emptySet(), +emptySet() +); Review comment: If there's a whole set of constants only used by one test, one might wonder whether that test shouldn't just be in its own class... ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ## @@ -148,27 +150,35 @@ private final TopicPartition t3p2 = new TopicPartition("topic3", 2); private final TopicPartition t3p3 = new TopicPartition("topic3", 3); -private final List infos = asList( -new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]) -); - -private final SubscriptionInfo defaultSubscriptionInfo = getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS); +private final List partitionInfos = getPartitionInfos(3, 3); +{ +partitionInfos.add(new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])); +} Review comment: Can you just pass this as an argument to `getPartitionInfos` so that we can do all the initialization in the assignment instead of needing an initialization block? The fact that this field is used in another field initialization statement makes the initialization block kind of questionable, since you have to read the JVM spec to know if this block executes before or after the usage. Alternatively, maybe the prior code was actually better, because you can see exactly what data you're testing with, instead of having to go read another method to understand what `getPartitioninfos(3, 3)` might mean. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub
[GitHub] [kafka] vvcephei commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster
vvcephei commented on pull request #8892: URL: https://github.com/apache/kafka/pull/8892#issuecomment-646303241 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10160) Kafka MM2 consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sats reassigned KAFKA-10160: Assignee: sats > Kafka MM2 consumer configuration > > > Key: KAFKA-10160 > URL: https://issues.apache.org/jira/browse/KAFKA-10160 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0, 2.4.1 >Reporter: Pavol Ipoth >Assignee: sats >Priority: Major > Labels: configuration, kafka, mirror-maker > > [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,] > according this producer/consumer level properties should be configured as > e.g. somesource->sometarget.consumer.client.id, i try to set > somesource->sometarget.consumer.auto.offset.reset=latest, but without > success, consumer always tries to fetch earliest, not sure if bug or my > misconfiguration, but then at least some update to docu would be useful -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas
[ https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139963#comment-17139963 ] Mateusz Jadczyk commented on KAFKA-9891: LGTM thanks for looking into it and making sure that 2.5.1/2.6 should be safe to use > Invalid state store content after task migration with exactly_once and > standby replicas > --- > > Key: KAFKA-9891 > URL: https://issues.apache.org/jira/browse/KAFKA-9891 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 2.5.0, 2.4.1 >Reporter: Mateusz Jadczyk >Assignee: Matthias J. Sax >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > Attachments: failedtest, failedtest2, failedtest3, failedtest3_bug, > state_store_operations.txt, tasks_assignment.txt > > > We have a simple command id deduplication mechanism (very similar to the one > from Kafka Streams examples) based on Kafka Streams State Stores. It stores > command ids from the past hour in _persistentWindowStore_. We encountered a > problem with the store if there's an exception thrown later in that topology. > We run 3 nodes using docker, each with multiple threads set for this > particular Streams Application. > The business flow is as follows (performed within a single subtopology): > * a valid command is sent with command id > (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active > task 1_2. First node in the topology analyses if this is a duplicate by > checking in the state store (_COMMAND_ID_STORE_), if not puts the command id > in the state store and processes the command properly. > * an invalid command is sent with the same key but new command id > (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the > duplicated command id is performed, it's not a duplicate, command id is put > into the state store. Next node in the topology throws an exception which > causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, > offsets are not committed. I double checked for the changelog topic - > relevant messages are not committed. Therefore, the changelog topic contains > only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and > not the one which caused a failure. > * in the meantime a standby task 1_2 running on NODE 3 replicated > _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local > _COMMAND_ID_STORE_ > * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. > It checks if this command id is a duplicate - no, it isn't - tries to process > the faulty command and throws an exception. Again, transaction aborted, all > looks fine. > * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, > *it is a duplicate!* Even though the transaction has been aborted and the > changelog doesn't contain this command id: > _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._ > > After digging into the Streams logs and some discussion on ([Stack > Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan]) > we concluded it has something to do with checkpoint files. Here are the > detailed logs relevant to checkpoint files. > > {code:java} > NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Restoring state store COMMAND_ID_STORE from changelog topic > Processor-COMMAND_ID_STORE-changelog at checkpoint null > NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] > standby-task [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > /tmp/kafka-streams/Processor/1_2/.checkpoint > NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
[jira] [Commented] (KAFKA-10185) Streams should log summarized restoration information at info level
[ https://issues.apache.org/jira/browse/KAFKA-10185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139955#comment-17139955 ] Boyang Chen commented on KAFKA-10185: - Could we add the context to this ticket? > Streams should log summarized restoration information at info level > --- > > Key: KAFKA-10185 > URL: https://issues.apache.org/jira/browse/KAFKA-10185 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted
dajac commented on pull request #8672: URL: https://github.com/apache/kafka/pull/8672#issuecomment-646271944 @hachikuji I just rebased and fixed the build issue. Could you re-trigger jenkins please? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on a change in pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test
skaundinya15 commented on a change in pull request #8894: URL: https://github.com/apache/kafka/pull/8894#discussion_r442445728 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -207,23 +212,45 @@ public void close() { backup.stop(); } +// throw exception after 3 retries, and print expected error messages +private void assertEqualsWithConsumeRetries(final String errorMsg, +final int numRecordsProduces, +final int timeout, +final ClusterType clusterType, +final String... topics) throws InterruptedException { +int retries = 3; +while (retries-- > 0) { +try { +int actualNum = clusterType == ClusterType.PRIMARY ? +primary.kafka().consume(numRecordsProduces, timeout, topics).count() : +backup.kafka().consume(numRecordsProduces, timeout, topics).count(); +if (numRecordsProduces == actualNum) +return; +} catch (Throwable e) { +log.error("Could not find enough records with {} retries left", retries, e); +} +} +throw new InterruptedException(errorMsg); +} + @Test public void testReplication() throws InterruptedException { MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig("primary")); MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig("backup")); -assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PRODUCED, Review comment: I'd agree with @ryannedolan here. We could use the `waitForCondition` in `TestUtils.java` instead to wait for the condition necessary instead. More details on that is here: https://github.com/apache/kafka/blob/d8cc6fe8e36329c647736773d9d66de89c447409/clients/src/test/java/org/apache/kafka/test/TestUtils.java#L370-L371 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139926#comment-17139926 ] Rohan Desai commented on KAFKA-10179: - I'm not sure it's correct to use the same "topic" name for materializing optimized source tables, as it's logically different data. In the normal flow (not recovery), we're taking the topic data, validating/transforming it by deserializing it (which might apply some transforms like projecting just fields of interest), and then serializing it, and then writing it into the store. So the "topic" we pass to the serializer should be different since it represents different data from the source topic. This has consequences in practice when used with a schema registry using the confluent serializers. If we use the same topic, `serialize` might register a different schema with the source subject, which we probably don't want. I think the technically correct thing to do (though this is of course more expensive) would be (when the source table is optimized) to deserialize and serialize each record when restoring. Another issue that I think exists (need to try to reproduce) that deserializing/serializing would solve is skipped validation. The source topic deserializer functions as a sort of validator for records from the source topic. When the streams app is configured to skip on deserialization errors, bad source records are just skipped. However if we restore by just writing those records to the state store, we now hit the deserialization error when reading the state store, which is a query-killing error. > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139925#comment-17139925 ] Sophie Blee-Goldman commented on KAFKA-10005: - Yeah, I didn't mean to imply we'd be synchronization-free with only standbys in a separate thread. But at least we'd only need to sync at rebalances, whereas with restoration in a separate thread we'll need to continually check for newly-restored tasks that should be taken over by the main thread. Anyways I agree, a POC and maybe some benchmarking should have the last word > Decouple RestoreListener from RestoreCallback and not enable bulk loading for > RocksDB > - > > Key: KAFKA-10005 > URL: https://issues.apache.org/jira/browse/KAFKA-10005 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.6.0 > > > In Kafka Streams we have two restoration callbacks: > * RestoreCallback (BatchingRestoreCallback): specified per-store via > registration to specify the logic of applying a batch of records read from > the changelog to the store. Used for both updating standby tasks and > restoring active tasks. > * RestoreListener: specified per-instance via `setRestoreListener`, to > specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`. > As we can see these two callbacks are for quite different purposes, however > today we allow user's to register a per-store RestoreCallback which is also > implementing the RestoreListener. Such weird mixing is actually motivated by > Streams internal usage to enable / disable bulk loading inside RocksDB. For > user's however this is less meaningful to specify a callback to be a listener > since the `onRestoreStart / End` has the storeName passed in, so that users > can just define different listening logic if needed for different stores. > On the other hand, this mixing of two callbacks enforces Streams to check > internally if the passed in per-store callback is also implementing listener, > and if yes trigger their calls, which increases the complexity. Besides, > toggle rocksDB for bulk loading requires us to open / close / reopen / > reclose 4 times during the restoration which could also be costly. > Given that we have KIP-441 in place, I think we should consider different > ways other than toggle bulk loading during restoration for Streams (e.g. > using different threads for restoration). > The proposal for this ticket is to completely decouple the listener from > callback -- i.e. we would not presume users passing in a callback function > that implements both RestoreCallback and RestoreListener, and also for > RocksDB we replace the bulk loading mechanism with other ways of > optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139533#comment-17139533 ] Guozhang Wang edited comment on KAFKA-10005 at 6/18/20, 6:58 PM: - So just to have a quick summary, my proposal is primarily in three folds: 1) use {{db.addFileWithFileInfo(externalSstFileInfo)}} during restoration to add batch of records as SST files directly, this is to replace the impact of bulk loading. 2) move the restoration off the stream thread to a different thread (pool), for both restoring active tasks as well as updating standby tasks. 3) if needed, we also disable compaction during the restoration, and do a one-phase full compaction when we complete. I'm keeping it as "optional" for now since disabling compaction has both pros and cons, and if we have good performance from 1/2) alone then maybe we can afford to keep compaction enabled. We already have an internal BulkLoadStore interface which e.g. RocksDBStore extends, we can leverage that interface to "toggle" restoration mode for 1) and 3) above. cc [~cadonna] was (Author: guozhang): So just to have a quick summary, my proposal is primarily in three folds: 1) use {{db.addFileWithFileInfo(externalSstFileInfo)}} during restoration to add batch of records as SST files directly, this is to replace the impact of bulk loading. 2) move the restoration off the stream thread to a different thread (pool), for both restoring active tasks as well as updating standby tasks. 3) if needed, we also disable compaction during the restoration, and do a one-phase full compaction when we complete. We already have an internal BulkLoadStore interface which e.g. RocksDBStore extends, we can leverage that interface to "toggle" restoration mode for 1) and 3) above. cc [~cadonna] > Decouple RestoreListener from RestoreCallback and not enable bulk loading for > RocksDB > - > > Key: KAFKA-10005 > URL: https://issues.apache.org/jira/browse/KAFKA-10005 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.6.0 > > > In Kafka Streams we have two restoration callbacks: > * RestoreCallback (BatchingRestoreCallback): specified per-store via > registration to specify the logic of applying a batch of records read from > the changelog to the store. Used for both updating standby tasks and > restoring active tasks. > * RestoreListener: specified per-instance via `setRestoreListener`, to > specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`. > As we can see these two callbacks are for quite different purposes, however > today we allow user's to register a per-store RestoreCallback which is also > implementing the RestoreListener. Such weird mixing is actually motivated by > Streams internal usage to enable / disable bulk loading inside RocksDB. For > user's however this is less meaningful to specify a callback to be a listener > since the `onRestoreStart / End` has the storeName passed in, so that users > can just define different listening logic if needed for different stores. > On the other hand, this mixing of two callbacks enforces Streams to check > internally if the passed in per-store callback is also implementing listener, > and if yes trigger their calls, which increases the complexity. Besides, > toggle rocksDB for bulk loading requires us to open / close / reopen / > reclose 4 times during the restoration which could also be costly. > Given that we have KIP-441 in place, I think we should consider different > ways other than toggle bulk loading during restoration for Streams (e.g. > using different threads for restoration). > The proposal for this ticket is to completely decouple the listener from > callback -- i.e. we would not presume users passing in a callback function > that implements both RestoreCallback and RestoreListener, and also for > RocksDB we replace the bulk loading mechanism with other ways of > optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139919#comment-17139919 ] Guozhang Wang commented on KAFKA-10005: --- I think even if we only move standbys to separate threads, there are still synchronization required for IQ and rebalance needs, but I think we need to get to a POC to have a clear idea how much thread synchronization overhead would be incurred. We can discuss about this further if we see the synchronization overhead with just standby and standby + active is very different. > Decouple RestoreListener from RestoreCallback and not enable bulk loading for > RocksDB > - > > Key: KAFKA-10005 > URL: https://issues.apache.org/jira/browse/KAFKA-10005 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.6.0 > > > In Kafka Streams we have two restoration callbacks: > * RestoreCallback (BatchingRestoreCallback): specified per-store via > registration to specify the logic of applying a batch of records read from > the changelog to the store. Used for both updating standby tasks and > restoring active tasks. > * RestoreListener: specified per-instance via `setRestoreListener`, to > specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`. > As we can see these two callbacks are for quite different purposes, however > today we allow user's to register a per-store RestoreCallback which is also > implementing the RestoreListener. Such weird mixing is actually motivated by > Streams internal usage to enable / disable bulk loading inside RocksDB. For > user's however this is less meaningful to specify a callback to be a listener > since the `onRestoreStart / End` has the storeName passed in, so that users > can just define different listening logic if needed for different stores. > On the other hand, this mixing of two callbacks enforces Streams to check > internally if the passed in per-store callback is also implementing listener, > and if yes trigger their calls, which increases the complexity. Besides, > toggle rocksDB for bulk loading requires us to open / close / reopen / > reclose 4 times during the restoration which could also be costly. > Given that we have KIP-441 in place, I think we should consider different > ways other than toggle bulk loading during restoration for Streams (e.g. > using different threads for restoration). > The proposal for this ticket is to completely decouple the listener from > callback -- i.e. we would not presume users passing in a callback function > that implements both RestoreCallback and RestoreListener, and also for > RocksDB we replace the bulk loading mechanism with other ways of > optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139916#comment-17139916 ] Sophie Blee-Goldman commented on KAFKA-10005: - I've been thinking about this lately and I'm not quite convinced we should move restoration to a separate thread as well (only standbys). With KIP-441, the majority of restoration will actually be done as a standby task. Only the last (hopefully-trivial) tail end of the changelog will be restored as an active task. Is that worth the overhead of thread synchronization to hand off tasks between the restore thread(s) and the main one? I'm not sure > Decouple RestoreListener from RestoreCallback and not enable bulk loading for > RocksDB > - > > Key: KAFKA-10005 > URL: https://issues.apache.org/jira/browse/KAFKA-10005 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.6.0 > > > In Kafka Streams we have two restoration callbacks: > * RestoreCallback (BatchingRestoreCallback): specified per-store via > registration to specify the logic of applying a batch of records read from > the changelog to the store. Used for both updating standby tasks and > restoring active tasks. > * RestoreListener: specified per-instance via `setRestoreListener`, to > specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`. > As we can see these two callbacks are for quite different purposes, however > today we allow user's to register a per-store RestoreCallback which is also > implementing the RestoreListener. Such weird mixing is actually motivated by > Streams internal usage to enable / disable bulk loading inside RocksDB. For > user's however this is less meaningful to specify a callback to be a listener > since the `onRestoreStart / End` has the storeName passed in, so that users > can just define different listening logic if needed for different stores. > On the other hand, this mixing of two callbacks enforces Streams to check > internally if the passed in per-store callback is also implementing listener, > and if yes trigger their calls, which increases the complexity. Besides, > toggle rocksDB for bulk loading requires us to open / close / reopen / > reclose 4 times during the restoration which could also be costly. > Given that we have KIP-441 in place, I think we should consider different > ways other than toggle bulk loading during restoration for Streams (e.g. > using different threads for restoration). > The proposal for this ticket is to completely decouple the listener from > callback -- i.e. we would not presume users passing in a callback function > that implements both RestoreCallback and RestoreListener, and also for > RocksDB we replace the bulk loading mechanism with other ways of > optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #8876: KAFKA-10167: use the admin client to read end-offset
guozhangwang commented on pull request #8876: URL: https://github.com/apache/kafka/pull/8876#issuecomment-646234217 Cherry-pick to 2.6. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #8876: KAFKA-10167: use the admin client to read end-offset
guozhangwang merged pull request #8876: URL: https://github.com/apache/kafka/pull/8876 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on pull request #8683: URL: https://github.com/apache/kafka/pull/8683#issuecomment-646217062 Thanks, @dajac for the comments. I've modified the PR per your suggestions. @rajinisivaram Do you think we can start testing? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging
vvcephei commented on a change in pull request #8896: URL: https://github.com/apache/kafka/pull/8896#discussion_r442397523 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ## @@ -415,19 +418,20 @@ public void restore() { // for restoring active and updating standby we may prefer different poll time // in order to make sure we call the main consumer#poll in time. // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern -polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? Duration.ZERO : pollTime); +polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime); Review comment: trivial cleanup ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ## @@ -415,19 +418,20 @@ public void restore() { // for restoring active and updating standby we may prefer different poll time // in order to make sure we call the main consumer#poll in time. // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern -polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? Duration.ZERO : pollTime); +polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime); } catch (final InvalidOffsetException e) { -log.warn("Encountered {} fetching records from restore consumer for partitions {}, it is likely that " + +log.warn("Encountered " + e.getClass().getName() + +" fetching records from restore consumer for partitions " + e.partitions() + ", it is likely that " + "the consumer's position has fallen out of the topic partition offset range because the topic was " + "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" + -" it later.", e.getClass().getName(), e.partitions()); +" it later.", e); Review comment: Added the exception itself as the "cause" of the warning. The actual message of the IOE is actually pretty good at explaining the root cause. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ## @@ -446,6 +450,38 @@ public void restore() { } maybeUpdateLimitOffsetsForStandbyChangelogs(); + +maybeLogRestorationProgress(); Review comment: This is the main change. Once every ten seconds, we will log the progress for each active restoring changelog. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ## @@ -415,19 +418,20 @@ public void restore() { // for restoring active and updating standby we may prefer different poll time // in order to make sure we call the main consumer#poll in time. // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern -polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? Duration.ZERO : pollTime); +polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime); } catch (final InvalidOffsetException e) { -log.warn("Encountered {} fetching records from restore consumer for partitions {}, it is likely that " + +log.warn("Encountered " + e.getClass().getName() + +" fetching records from restore consumer for partitions " + e.partitions() + ", it is likely that " + "the consumer's position has fallen out of the topic partition offset range because the topic was " + "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" + -" it later.", e.getClass().getName(), e.partitions()); +" it later.", e); final Map> taskWithCorruptedChangelogs = new HashMap<>(); for (final TopicPartition partition : e.partitions()) { final TaskId taskId = changelogs.get(partition).stateManager.taskId(); taskWithCorruptedChangelogs.computeIfAbsent(taskId, k -> new HashSet<>()).add(partition); } -throw new TaskCorruptedException(taskWithCorruptedChangelogs); +throw new TaskCorruptedException(taskWithCorruptedChangelogs, e); Review comment:
[GitHub] [kafka] hachikuji commented on pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment
hachikuji commented on pull request #8891: URL: https://github.com/apache/kafka/pull/8891#issuecomment-646209534 Had some discussion offline with @cmccabe . The intention in KIP-455 is to use --additional to resubmit the reassignment and change the quota. I found that this did not work as expected when I tried it, so let me try to modify this patch so that the new integration tests use this behavior. Possibly we don't need --alter-throttle, but the tests and improved documentation would still be useful. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442394058 ## File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * An util class for exponential backoff, backoff, etc... + * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n + * If scaleFactor is greater or equal than termMax, a constant term of will be provided + * This class is thread-safe + */ +public class GeometricProgression { +private final int ratio; +private final double expMax; +private final long scaleFactor; +private final double jitter; + +public GeometricProgression(long scaleFactor, int ratio, long termMax, double jitter) { +this.scaleFactor = scaleFactor; +this.ratio = ratio; +this.jitter = jitter; +this.expMax = termMax > scaleFactor ? +Math.log(termMax / (double) Math.max(scaleFactor, 1)) / Math.log(ratio) : 0; +} + +public long term(long n) { Review comment: As we noticed in your earlier comments, the same value of `attempts` may correspond to different terms. connection_timeout = constant * 2 ^ (attempts) reconnect_backoff = constant * 2 ^ (attempts - 1) (in KIP-580) retry_backoff = constant * 2 ^ (attempts - 1) So I think using `retries` or `attempts` instead of `n` might also confuse people. Shall we think of another naming? ## File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * An util class for exponential backoff, backoff, etc... + * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n + * If scaleFactor is greater or equal than termMax, a constant term of will be provided + * This class is thread-safe + */ +public class GeometricProgression { +private final int ratio; +private final double expMax; +private final long scaleFactor; +private final double jitter; + +public GeometricProgression(long scaleFactor, int ratio, long termMax, double jitter) { +this.scaleFactor = scaleFactor; +this.ratio = ratio; +this.jitter = jitter; +this.expMax = termMax > scaleFactor ? +Math.log(termMax / (double) Math.max(scaleFactor, 1)) / Math.log(ratio) : 0; +} + +public long term(long n) { Review comment: As we noticed in your earlier comments, the same value of `attempts` may correspond to different terms. connection_timeout = constant * 2 ^ (attempts) reconnect_backoff = constant * 2 ^ (attempts - 1) (in KIP-580) retry_backoff = constant * 2 ^ (attempts - 1) So I think using `retries` or `attempts` instead of `n` might also confuse people. Shall we think of another naming? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442389711 ## File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * An util class for exponential backoff, backoff, etc... + * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n + * If scaleFactor is greater or equal than termMax, a constant term of will be provided + * This class is thread-safe + */ +public class GeometricProgression { Review comment: Good idea. Will go for `ExponentialBackoff` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #8896: KAFKA-10185: Restoration info logging
vvcephei opened a new pull request #8896: URL: https://github.com/apache/kafka/pull/8896 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442387660 ## File path: clients/src/test/java/org/apache/kafka/common/utils/GeometricProgressionTest.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class GeometricProgressionTest { +@Test +public void testGeometricProgression() { +long scaleFactor = 100; +int ratio = 2; +long termMax = 2000; +double jitter = 0.2; +GeometricProgression geometricProgression = new GeometricProgression( +scaleFactor, ratio, termMax, jitter +); + +for (int i = 0; i <= 100; i++) { +for (int n = 0; n <= 4; n++) { +assertEquals(scaleFactor * Math.pow(ratio, n), geometricProgression.term(n), +scaleFactor * Math.pow(ratio, n) * jitter); +} +System.out.println(geometricProgression.term(5)); Review comment: Oh, right. I missed removing it. ## File path: clients/src/test/java/org/apache/kafka/common/utils/GeometricProgressionTest.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class GeometricProgressionTest { +@Test +public void testGeometricProgression() { +long scaleFactor = 100; +int ratio = 2; +long termMax = 2000; +double jitter = 0.2; +GeometricProgression geometricProgression = new GeometricProgression( +scaleFactor, ratio, termMax, jitter +); + +for (int i = 0; i <= 100; i++) { +for (int n = 0; n <= 4; n++) { +assertEquals(scaleFactor * Math.pow(ratio, n), geometricProgression.term(n), +scaleFactor * Math.pow(ratio, n) * jitter); +} +System.out.println(geometricProgression.term(5)); Review comment: Oh, right. I forgot removing it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10185) Streams should log summarized restoration information at info level
John Roesler created KAFKA-10185: Summary: Streams should log summarized restoration information at info level Key: KAFKA-10185 URL: https://issues.apache.org/jira/browse/KAFKA-10185 Project: Kafka Issue Type: Task Components: streams Reporter: John Roesler Assignee: John Roesler -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442382471 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java ## @@ -149,6 +155,16 @@ atLeast(0), Importance.MEDIUM, REQUEST_TIMEOUT_MS_DOC) + .define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG, +Type.LONG, +10 * 1000, +Importance.MEDIUM, + CommonClientConfigs.SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_DOC) + .define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MAX_MS_CONFIG, +Type.LONG, +127 * 1000, Review comment: Sounds good. WIll refactor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment
cmccabe commented on a change in pull request #8891: URL: https://github.com/apache/kafka/pull/8891#discussion_r442378048 ## File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ## @@ -930,6 +935,38 @@ object ReassignPartitionsCommand extends Logging { (brokerListToReassign, topicsToReassign) } + /** + * The entry point for --alter-throttles. At least one throttle value must be provided. + * + * @param admin The Admin instance to use + * @param interBrokerThrottle The new inter-broker throttle or -1 to leave it unchanged + * @param logDirThrottle The new alter-log-dir throttle or -1 to leave it unchanged + */ + def alterThrottles(admin: Admin, + interBrokerThrottle: Long, + logDirThrottle: Long): Unit = { +if (interBrokerThrottle < 0 && logDirThrottle < 0) { + throw new TerseReassignmentFailureException("No valid throttle values provided to --alter-throttle") Review comment: It would be good to include the flags needed to pass in a throttle in this message This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442375424 ## File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ## @@ -786,6 +808,29 @@ private void handleAbortedSends(List responses) { abortedSends.clear(); } +/** + * Handle socket channel connection timeout. The timeout will hit iff a connection + * stays at the ConnectionState.CONNECTING state longer than the timeout value, + * as indicated by ClusterConnectionStates.NodeConnectionState. + * + * @param responses The list of responses to update + * @param now The current time + */ +private void handleTimeoutConnections(List responses, long now) { +Set connectingNodes = connectionStates.connectingNodes(); +for (String nodeId: connectingNodes) { Review comment: Refactored This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442372873 ## File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java ## @@ -103,6 +103,12 @@ Utils.join(SecurityProtocol.names(), ", ") + "."; public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT"; +public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms"; +public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the initial socket connection to be built. If the connection is not built before the timeout elapses the network client will close the socket channel. The default value will be 10 seconds."; + +public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = "socket.connection.setup.timeout.max.ms"; +public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = "The maximum amount of time the client will wait for the initial socket connection to be built. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. The default value will be 127 seconds."; Review comment: Refactored This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted
hachikuji commented on pull request #8672: URL: https://github.com/apache/kafka/pull/8672#issuecomment-646173735 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException
hachikuji commented on pull request #8822: URL: https://github.com/apache/kafka/pull/8822#issuecomment-646168538 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
[ https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139548#comment-17139548 ] Sophie Blee-Goldman commented on KAFKA-10184: - Or instead (or in addition to the above) maybe we should wait for the streams to be in RUNNING before we start the timeout for writing records > Flaky > HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores > -- > > Key: KAFKA-10184 > URL: https://issues.apache.org/jira/browse/KAFKA-10184 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Minor > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 12. Input > records haven't all been written to the changelog: 442 > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at >
[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
[ https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139546#comment-17139546 ] Sophie Blee-Goldman commented on KAFKA-10184: - [~vvcephei] Could we maybe do something like "write as many records as you can in the 12ms timeout"? Since as you said the whole point is just to make sure we have enough records to represent a "reasonably large" number, if it takes 2 minutes to write only 100 records then those 100 records represent a heavy load (apparently...) > Flaky > HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores > -- > > Key: KAFKA-10184 > URL: https://issues.apache.org/jira/browse/KAFKA-10184 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Minor > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 12. Input > records haven't all been written to the changelog: 442 > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) >
[GitHub] [kafka] edoardocomar commented on pull request #4204: KAFKA-5238: BrokerTopicMetrics can be recreated after topic is deleted
edoardocomar commented on pull request #4204: URL: https://github.com/apache/kafka/pull/4204#issuecomment-646162106 After @hachikuji fixes in https://github.com/apache/kafka/pull/8586 the metrics are no longer ticked at the end of a DelayedFetch, so the time window for topic deletion is almost non existent and the only guard code needed is left in `KafkaApis`, as shown by the unit test added by this PR. This PR is now tiny and would be nice to have it merged :-) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
[ https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139543#comment-17139543 ] Sophie Blee-Goldman commented on KAFKA-10184: - Yeah, it's failing on the setup and hasn't even gotten to the real test at all cc/ [~vvcephei] seems like "500" was still too high :/ > Flaky > HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores > -- > > Key: KAFKA-10184 > URL: https://issues.apache.org/jira/browse/KAFKA-10184 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Minor > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 12. Input > records haven't all been written to the changelog: 442 > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at >
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442363632 ## File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ## @@ -678,7 +696,11 @@ public Node leastLoadedNode(long now) { } else if (connectionStates.isPreparingConnection(node.idString())) { foundConnecting = node; } else if (canConnect(node, now)) { -foundCanConnect = node; +if (foundCanConnect == null || + this.connectionStates.lastConnectAttemptMs(foundCanConnect.idString()) > + this.connectionStates.lastConnectAttemptMs(node.idString())) { +foundCanConnect = node; +} Review comment: Yes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442362673 ## File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java ## @@ -103,6 +103,12 @@ Utils.join(SecurityProtocol.names(), ", ") + "."; public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT"; +public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms"; +public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the initial socket connection to be built. If the connection is not built before the timeout elapses the network client will close the socket channel. The default value will be 10 seconds."; Review comment: Make sense. I'll change the description and remove the defaults in the doc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) { return state; } +/** + * Get the id set of nodes which are in CONNECTING state + */ +public Set connectingNodes() { +return this.connectingNodes; +} + +/** + * Get the timestamp of the latest connection attempt of a given node + * @param id the connection to fetch the state for + */ +public long lastConnectAttemptMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState == null ? 0 : nodeState.lastConnectAttemptMs; +} + +public long connectionSetupTimeoutMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState.connectionSetupTimeoutMs; +} + +/** + * Test if the connection to the given node has reached its timeout + * @param id the connection to fetch the state for + * @param now the current time in ms + */ +public boolean isConnectionSetupTimeout(String id, long now) { +return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id); Review comment: I think so. The `lastConnectAttemptMs` is updated in both `connecting` (Line 145 & Line 157) and `disconnected`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) { return state; } +/** + * Get the id set of nodes which are in CONNECTING state + */ +public Set connectingNodes() { +return this.connectingNodes; +} + +/** + * Get the timestamp of the latest connection attempt of a given node + * @param id the connection to fetch the state for + */ +public long lastConnectAttemptMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState == null ? 0 : nodeState.lastConnectAttemptMs; +} + +public long connectionSetupTimeoutMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState.connectionSetupTimeoutMs; +} + +/** + * Test if the connection to the given node has reached its timeout + * @param id the connection to fetch the state for + * @param now the current time in ms + */ +public boolean isConnectionSetupTimeout(String id, long now) { +return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id); Review comment: I think so. The `lastConnectAttemptMs` is updated in both `connecting` and `disconnected`. (Line 145 & Line 157) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) { return state; } +/** + * Get the id set of nodes which are in CONNECTING state + */ +public Set connectingNodes() { +return this.connectingNodes; +} + +/** + * Get the timestamp of the latest connection attempt of a given node + * @param id the connection to fetch the state for + */ +public long lastConnectAttemptMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState == null ? 0 : nodeState.lastConnectAttemptMs; +} + +public long connectionSetupTimeoutMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState.connectionSetupTimeoutMs; +} + +/** + * Test if the connection to the given node has reached its timeout + * @param id the connection to fetch the state for + * @param now the current time in ms + */ +public boolean isConnectionSetupTimeout(String id, long now) { +return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id); Review comment: I think so. The `lastConnectAttemptMs` is updated in both `connecting` and `disconnected`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
[ https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-10184: -- Issue Type: Test (was: Bug) > Flaky > HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores > -- > > Key: KAFKA-10184 > URL: https://issues.apache.org/jira/browse/KAFKA-10184 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Major > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 12. Input > records haven't all been written to the changelog: 442 > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at
[jira] [Updated] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
[ https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-10184: -- Priority: Minor (was: Major) > Flaky > HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores > -- > > Key: KAFKA-10184 > URL: https://issues.apache.org/jira/browse/KAFKA-10184 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Minor > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 12. Input > records haven't all been written to the changelog: 442 > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at
[jira] [Created] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
Guozhang Wang created KAFKA-10184: - Summary: Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores Key: KAFKA-10184 URL: https://issues.apache.org/jira/browse/KAFKA-10184 Project: Kafka Issue Type: Bug Components: streams, unit tests Reporter: Guozhang Wang {code} Stacktrace java.lang.AssertionError: Condition not met within timeout 12. Input records haven't all been written to the changelog: 442 at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) at org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149) at org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) { return state; } +/** + * Get the id set of nodes which are in CONNECTING state + */ +public Set connectingNodes() { +return this.connectingNodes; +} + +/** + * Get the timestamp of the latest connection attempt of a given node + * @param id the connection to fetch the state for + */ +public long lastConnectAttemptMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState == null ? 0 : nodeState.lastConnectAttemptMs; +} + +public long connectionSetupTimeoutMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState.connectionSetupTimeoutMs; +} + +/** + * Test if the connection to the given node has reached its timeout + * @param id the connection to fetch the state for + * @param now the current time in ms + */ +public boolean isConnectionSetupTimeout(String id, long now) { +return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id); Review comment: Good catch. I'll make the logic record it in both `connecting` and `disconnected`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442354307 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) { return state; } +/** + * Get the id set of nodes which are in CONNECTING state + */ +public Set connectingNodes() { +return this.connectingNodes; +} + +/** + * Get the timestamp of the latest connection attempt of a given node + * @param id the connection to fetch the state for + */ +public long lastConnectAttemptMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState == null ? 0 : nodeState.lastConnectAttemptMs; +} + +public long connectionSetupTimeoutMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState.connectionSetupTimeoutMs; Review comment: No. The caller will ensure that the node is in the connecting state. I'll add an IllegalStateException here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139533#comment-17139533 ] Guozhang Wang commented on KAFKA-10005: --- So just to have a quick summary, my proposal is primarily in three folds: 1) use {{db.addFileWithFileInfo(externalSstFileInfo)}} during restoration to add batch of records as SST files directly, this is to replace the impact of bulk loading. 2) move the restoration off the stream thread to a different thread (pool), for both restoring active tasks as well as updating standby tasks. 3) if needed, we also disable compaction during the restoration, and do a one-phase full compaction when we complete. We already have an internal BulkLoadStore interface which e.g. RocksDBStore extends, we can leverage that interface to "toggle" restoration mode for 1) and 3) above. cc [~cadonna] > Decouple RestoreListener from RestoreCallback and not enable bulk loading for > RocksDB > - > > Key: KAFKA-10005 > URL: https://issues.apache.org/jira/browse/KAFKA-10005 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.6.0 > > > In Kafka Streams we have two restoration callbacks: > * RestoreCallback (BatchingRestoreCallback): specified per-store via > registration to specify the logic of applying a batch of records read from > the changelog to the store. Used for both updating standby tasks and > restoring active tasks. > * RestoreListener: specified per-instance via `setRestoreListener`, to > specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`. > As we can see these two callbacks are for quite different purposes, however > today we allow user's to register a per-store RestoreCallback which is also > implementing the RestoreListener. Such weird mixing is actually motivated by > Streams internal usage to enable / disable bulk loading inside RocksDB. For > user's however this is less meaningful to specify a callback to be a listener > since the `onRestoreStart / End` has the storeName passed in, so that users > can just define different listening logic if needed for different stores. > On the other hand, this mixing of two callbacks enforces Streams to check > internally if the passed in per-store callback is also implementing listener, > and if yes trigger their calls, which increases the complexity. Besides, > toggle rocksDB for bulk loading requires us to open / close / reopen / > reclose 4 times during the restoration which could also be costly. > Given that we have KIP-441 in place, I think we should consider different > ways other than toggle bulk loading during restoration for Streams (e.g. > using different threads for restoration). > The proposal for this ticket is to completely decouple the listener from > callback -- i.e. we would not presume users passing in a callback function > that implements both RestoreCallback and RestoreListener, and also for > RocksDB we replace the bulk loading mechanism with other ways of > optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
ijuma commented on pull request #7929: URL: https://github.com/apache/kafka/pull/7929#issuecomment-646140719 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
hachikuji commented on pull request #8850: URL: https://github.com/apache/kafka/pull/8850#issuecomment-646126053 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
hachikuji commented on pull request #8850: URL: https://github.com/apache/kafka/pull/8850#issuecomment-646125880 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
hachikuji commented on pull request #8850: URL: https://github.com/apache/kafka/pull/8850#issuecomment-646125726 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org