[jira] [Created] (KAFKA-9934) Information and doc update needed for support of AclAuthorizer when protocol is PLAINTEXT
kaushik srinivas created KAFKA-9934: --- Summary: Information and doc update needed for support of AclAuthorizer when protocol is PLAINTEXT Key: KAFKA-9934 URL: https://issues.apache.org/jira/browse/KAFKA-9934 Project: Kafka Issue Type: Improvement Components: security Affects Versions: 2.4.1 Reporter: kaushik srinivas Need information on the case where the protocol is PLAINTEXT for listeners in kafka. Does Authorization applies when the protocol is PLAINTEXT ? if so, what would be used as the principal name for the authorization acl validations? There is no doc which describes this case. Need info and doc update for the same. Thanks, kaushik. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9921) Caching is not working properly with WindowStateStore when retaining duplicates
[ https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Georgi Petkov updated KAFKA-9921: - Summary: Caching is not working properly with WindowStateStore when retaining duplicates (was: Caching is not working properly with WindowStateStore when rataining duplicates) > Caching is not working properly with WindowStateStore when retaining > duplicates > --- > > Key: KAFKA-9921 > URL: https://issues.apache.org/jira/browse/KAFKA-9921 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Georgi Petkov >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0, 2.5.1 > > > I'm using the current latest version 2.5.0 but this is not something new. > I have _WindowStateStore_ configured as following (where _true_ stands for > the _retainDuplicates_ paramter): > _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, > retentionPeriod, windowSize, *true*), keySerde, > valueSerde)*.withCachingEnabled()*)_ > If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that > order when reading them through the iterator I'll get the values *4, 2, 3, 4*. > I've done a bit of investigation myself and the problem is that *the whole > caching feature is written without consideration of the case where duplicates > are retained*. > The observed behavior is due to having the last value in the cache (and it > can have only one since it's not aware of the retain duplicates option) and > it is read first (while skipping the first from the RocksDB iterator even > though the values are different). This can be observed (for version 2.5.0) in > _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 > values are read from the RocksDB iterator so they are as expected. > As I said, the whole feature is not considering the _retainDuplicates_ option > so there are other examples of incorrect behavior like in > _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you > would skip one duplicate entry in the RocksDB iterator for the given key. > In my use case, I want to persist a list of values for a given key without > increasing the complexity to linear for a single event (which would be the > case if I was always reading the current list appending one value and writing > it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use > _#transformValues_ and state stores. > So as an impact I can't use caching on my state stores. For others - they'll > have incorrect behavior that may take a lot of time to be discovered and even > more time to fix the results. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9933) Need doc update on the AclAuthorizer when SASL_SSL is the protocol used.
kaushik srinivas created KAFKA-9933: --- Summary: Need doc update on the AclAuthorizer when SASL_SSL is the protocol used. Key: KAFKA-9933 URL: https://issues.apache.org/jira/browse/KAFKA-9933 Project: Kafka Issue Type: Improvement Components: security Affects Versions: 2.4.1 Reporter: kaushik srinivas Hello, Document on the usage of the authorizer does not speak about the principal being used when the protocol for the listener is chosen as SASL + SSL (SASL_SSL). Suppose kerberos and ssl is enabled together, will the authorization be based on the kerberos principal names or on the ssl certificate DN names ? There is no document covering this part of the use case. This needs information and documentation update. Thanks, Kaushik. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] C0urante commented on a change in pull request #8554: KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd
C0urante commented on a change in pull request #8554: URL: https://github.com/apache/kafka/pull/8554#discussion_r417070806 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -281,9 +281,18 @@ private void readToLogEnd() { Iterator> it = endOffsets.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = it.next(); -if (consumer.position(entry.getKey()) >= entry.getValue()) +TopicPartition topicPartition = entry.getKey(); +Long endOffset = entry.getValue(); Review comment: Fair enough; added. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #8554: KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd
C0urante commented on pull request #8554: URL: https://github.com/apache/kafka/pull/8554#issuecomment-620992508 Thanks @kkonstantine! These all seem like reasonable suggestions and I've applied them all. Ready for the next round when you have time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #8582: KAFKA-9932: Don't load configs from ZK when the log has already been loaded
ijuma opened a new pull request #8582: URL: https://github.com/apache/kafka/pull/8582 If a broker contains 8k replicas, we would previously issue 8k ZK calls to retrieve topic configs when processing the first LeaderAndIsr request. That should translate to 0 after these changes. Credit to @junrao for identifying the problem. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9932) First LeaderAndIsrRequest can take long due to unnecessary ZK read
Ismael Juma created KAFKA-9932: -- Summary: First LeaderAndIsrRequest can take long due to unnecessary ZK read Key: KAFKA-9932 URL: https://issues.apache.org/jira/browse/KAFKA-9932 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Assignee: Ismael Juma [~junrao] found the following issue: {quote}In Partition, we have the following code. fetchLogConfig is passed in to logManager.getOrCreateLog by value. This can increase the processing time for the very first LeaderAndIsrRequest since every partition has to do a ZK read to load the log config. This is unnecessary if the log is always present and loaded during the initialization of LogManager. {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9932) First LeaderAndIsrRequest can take long due to unnecessary ZK read
[ https://issues.apache.org/jira/browse/KAFKA-9932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-9932: --- Fix Version/s: 2.6.0 > First LeaderAndIsrRequest can take long due to unnecessary ZK read > -- > > Key: KAFKA-9932 > URL: https://issues.apache.org/jira/browse/KAFKA-9932 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Fix For: 2.6.0 > > > [~junrao] found the following issue: > {quote}In Partition, we have the following code. fetchLogConfig is passed in > to logManager.getOrCreateLog by value. This can increase the processing time > for the very first LeaderAndIsrRequest since every partition has to do a ZK > read to load the log config. This is unnecessary if the log is always present > and loaded during the initialization of LogManager. > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on a change in pull request #8554: KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd
kkonstantine commented on a change in pull request #8554: URL: https://github.com/apache/kafka/pull/8554#discussion_r417055199 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -281,9 +281,18 @@ private void readToLogEnd() { Iterator> it = endOffsets.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = it.next(); -if (consumer.position(entry.getKey()) >= entry.getValue()) +TopicPartition topicPartition = entry.getKey(); +Long endOffset = entry.getValue(); +long lastConsumedOffset = consumer.position(topicPartition); +if (lastConsumedOffset >= endOffset) { +log.trace("Reached end offset {} for {}", endOffset, topicPartition); it.remove(); -else { +} else { +log.trace( +"Behind end offset {} for {}; last-consumed offset is {}", +endOffset, +topicPartition, +lastConsumedOffset); Review comment: ```suggestion log.trace("Behind end offset {} for {}; last-consumed offset is {}", endOffset, topicPartition, lastConsumedOffset); ``` nit: multiline calls don't need to be on their own line in AK and tab is equal to 4 spaces (here we need 2 tabs) ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -281,9 +281,18 @@ private void readToLogEnd() { Iterator> it = endOffsets.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = it.next(); -if (consumer.position(entry.getKey()) >= entry.getValue()) +TopicPartition topicPartition = entry.getKey(); +Long endOffset = entry.getValue(); Review comment: unboxing will happen in the comparison in the `if` branch anyways, so probably better to do it early declaring the type `long` here. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -281,9 +281,18 @@ private void readToLogEnd() { Iterator> it = endOffsets.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = it.next(); -if (consumer.position(entry.getKey()) >= entry.getValue()) +TopicPartition topicPartition = entry.getKey(); +Long endOffset = entry.getValue(); +long lastConsumedOffset = consumer.position(topicPartition); +if (lastConsumedOffset >= endOffset) { +log.trace("Reached end offset {} for {}", endOffset, topicPartition); Review comment: given that the previous messages say "Reading to ..." maybe it would make sense to say: ```suggestion log.trace("Read to end offset {} for {}", endOffset, topicPartition); ``` ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -281,9 +281,18 @@ private void readToLogEnd() { Iterator> it = endOffsets.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = it.next(); -if (consumer.position(entry.getKey()) >= entry.getValue()) +TopicPartition topicPartition = entry.getKey(); +Long endOffset = entry.getValue(); +long lastConsumedOffset = consumer.position(topicPartition); +if (lastConsumedOffset >= endOffset) { +log.trace("Reached end offset {} for {}", endOffset, topicPartition); it.remove(); -else { +} else { +log.trace( +"Behind end offset {} for {}; last-consumed offset is {}", +endOffset, +topicPartition, +lastConsumedOffset); Review comment: Similar to the above, seeing a message that says `read` might be easier to read in context than `consumed`. How about: `Behind end offset {} for {}; last-read offset is {}` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
vvcephei commented on pull request #8540: URL: https://github.com/apache/kafka/pull/8540#issuecomment-620976620 Cherry-picked to 2.5 as 9e2785fd1ba0ed16604e01058bae6b60ff9f3d96 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #8581: MINOR: Fix typo and rephrase content in docs
showuon opened a new pull request #8581: URL: https://github.com/apache/kafka/pull/8581 1. fix typo: `atleast` -> `at least` 2. add missing `--` to be consistent 3. rephrase a sentence, to make it more clear: before: `LinkedIn is currently running JDK 1.8 u5 (looking to upgrade to a newer version) with the G1 collector` It will misguide the users to use JDK 1.8 u5, while the JDK 1.8 u251 is already released, which will include many important bug fixes. I did some rephrase as below: after: `At the time when we write this, LinkedIn is running JDK 1.8 u5 (looking to upgrade to a newer version) with the G1 collector` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
vvcephei commented on pull request #8540: URL: https://github.com/apache/kafka/pull/8540#issuecomment-620973163 Cherry-pick for 2.5 in progress... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8204: KAFKA-9633: Ensure ConfigProviders are closed
kkonstantine commented on pull request #8204: URL: https://github.com/apache/kafka/pull/8204#issuecomment-620971604 JDK8 build failed on a relevant test: https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2007/testReport/junit/org.apache.kafka.connect.runtime/WorkerTest/testStartAndStopConnector/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8574: KAFKA-9925: decorate pseudo-topics with app id
vvcephei commented on pull request #8574: URL: https://github.com/apache/kafka/pull/8574#issuecomment-620971180 Thanks, @arkins ! Shame is a powerful motivator :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8578: KAFKA-9875: Make integration tests more resilient
vvcephei commented on pull request #8578: URL: https://github.com/apache/kafka/pull/8578#issuecomment-620970911 Thanks for the review, @guozhangwang . I've addressed (or responded to) your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient
vvcephei commented on a change in pull request #8578: URL: https://github.com/apache/kafka/pull/8578#discussion_r417047054 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java ## @@ -106,7 +108,7 @@ public void before() { consumerConfiguration = new Properties(); consumerConfiguration.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); -consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, name.getMethodName() + "-consumer"); +consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, safeTestName + "-consumer"); Review comment: ```suggestion consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName); ``` No, I just got tired of messing with every tests' idiosyncrasies. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient
vvcephei commented on a change in pull request #8578: URL: https://github.com/apache/kafka/pull/8578#discussion_r417046624 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java ## @@ -156,7 +157,7 @@ public void shouldPreservePartitionTimeOnKafkaStreamRestart() { assertThat(lastRecordedTimestamp, is(5000L)); } finally { kafkaStreams.close(); -cleanStateAfterTest(CLUSTER, kafkaStreams); +quietlyCleanStateAfterTest(CLUSTER, kafkaStreams); Review comment: Unfortunately, we generally can't use try-with-resources for these tests, since that makes the `kafkaStreams` reference out of scope for the finally block. We'd have to allow a reference to kafkaStreams to escape the try {} block to reference it either in finally or in an After method, which is just as messy as it currently is, if not more. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8579: KAFKA-9930: Prevent ReplicaFetcherThread from throwing UnknownTopicOrPartitionException upon topic creation and deletion.
ijuma commented on a change in pull request #8579: URL: https://github.com/apache/kafka/pull/8579#discussion_r417044882 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -382,6 +382,11 @@ abstract class AbstractFetcherThread(name: String, "that the partition is being moved") partitionsWithError += topicPartition +case Errors.UNKNOWN_TOPIC_OR_PARTITION => + warn(s"Remote broker does not host the partition $topicPartition, which could indicate " + +"that the partition is being created or deleted.") Review comment: Should it be `info` if we think this is expected? That would still show in the logs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient
vvcephei commented on a change in pull request #8578: URL: https://github.com/apache/kafka/pull/8578#discussion_r417044713 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ## @@ -145,12 +162,12 @@ public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, fina } } -public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) { -driver.cleanUp(); +public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) { try { +driver.cleanUp(); cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT); -} catch (final InterruptedException e) { -throw new RuntimeException(e); +} catch (final RuntimeException | InterruptedException e) { +LOG.warn("Ignoring failure to clean test state", e); } Review comment: I share your concern, but I'm not sure about the conclusion. Yes, if there is state (such as a topic) that leaks from one test to the next, it can certainly cause difficult-to-debug failures. However, there are multiple things we can do to prevent/mitigate it: * delete state after tests (not to leave any garbage behind) * delete state before the tests (to ensure a clean slate for the test) * choose unique names for all resources of each test (this is where the other part of this PR comes in) Any one of these should be sufficient to prevent state from leaking in between tests, and most of these tests do all three. In other words, we have 3x redundancy guarding against such test pollution. If you look at all three of these measures, the clean up _after_ tests is actually the most optional, since tests can't tolerate failures in the clean up _before_ (because it also creates necessary topics), and choosing unique topic names per test is bulletproof and easy to fix (once we know what the problem is). Whether the cleanup is part of the test or in the `@After` method, the outcome is the same, if the method throws an exception, the test will fail. The downside of After is that it requires you to store the topic names in mutable class-level fields, which actually makes it more awkward to choose unique names per test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8442: KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses
kkonstantine commented on pull request #8442: URL: https://github.com/apache/kafka/pull/8442#issuecomment-620964908 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8442: KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses
kkonstantine commented on pull request #8442: URL: https://github.com/apache/kafka/pull/8442#issuecomment-620964776 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient
vvcephei commented on a change in pull request #8578: URL: https://github.com/apache/kafka/pull/8578#discussion_r417040202 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java ## @@ -215,13 +216,13 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store } private Properties streamsConfiguration() { -final String applicationId = "streamsApp"; +final String safeTestName = safeUniqueTestName(getClass(), testName); final Properties config = new Properties(); config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); -config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId + name.getMethodName()); +config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port)); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); -config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath()); +config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); Review comment: I wavered on this point, but each time you call `tempDirectory`, it should give you a completely new directory: ``` * Create a temporary relative directory in the default temporary-file directory with the given prefix. * * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix ``` So the prefix seems to be nice only for documenting which test a directory is for, not for enforcing any kind of test/directory uniqueness. I felt like it added more noise than value, so I just dropped all the prefixes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8580: KAFKA-9832: fix attempt to commit non-running tasks
vvcephei commented on pull request #8580: URL: https://github.com/apache/kafka/pull/8580#issuecomment-620962357 Hey @mjsax , do you have time for a quick review? This bug seems to have been introduced by https://github.com/apache/kafka/pull/8440/files#r407722022 , which attempts to commit all non-corrupted tasks. Some of these tasks may not be running. The Task implementations will throw an exception if we attempt to `prepareCommit` while not in state RUNNING (or RESTORING). We could make the task more permissive, so that it would ignore the commit to a task that is not in a committable state. I opted instead to filter out only the tasks in committable states, though. I was concerned that if we make prepareCommit more permissive, we might just complicate the rest of the commit lifecycle, because then the rest of it would also have to be permissive, etc. Thanks for the very nice test in your prior PR; it was easy to extend it to cover this case and also to add the regression test. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #8580: KAFKA-9832: fix attempt to commit non-running tasks
vvcephei opened a new pull request #8580: URL: https://github.com/apache/kafka/pull/8580 Fixes an attempt to commit potentially non-running tasks while recovering from task corruption. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-9931) Kafka Connect should accept '-1' as a valid replication factor
[ https://issues.apache.org/jira/browse/KAFKA-9931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094975#comment-17094975 ] Randall Hauch edited comment on KAFKA-9931 at 4/29/20, 1:52 AM: We'll need a KIP for this change, but it should be relatively straightforward if we don't change the default or the config files, and merely accept values that are either 1 or >= 1. However, I do think this is a good time for Connect's worker logic when creating internal topics to support other topic settings via worker config properties using the {{config.storage.topic.*}}*,* {{offset.storage.topic.*}}, and {{status.storage.topic.*}} prefixes. was (Author: rhauch): We'll need a KIP for this change, but it should be relatively straightforward if we don't change the default or the config files, and merely accept values that are either 1 or >= 1. However, I do think this is a good time for Connect's worker logic when creating internal topics to support other topic settings via worker config properties using the `config.storage.topic.*`, `offset.storage.topic.*`, and `status.storage.topic.*` prefixes. > Kafka Connect should accept '-1' as a valid replication factor > -- > > Key: KAFKA-9931 > URL: https://issues.apache.org/jira/browse/KAFKA-9931 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Labels: needs-kip > Fix For: 2.6.0 > > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic] > As of KIP-464, the adminclient can use '-1' as the replication factor or > partitions and the broker defaults. The Kafka Connect Frame work does not > currently accept anything less than 1 as a valid replication factor. This > should be changed so that Connect worker configurations can specify `-1` for > the internal topic replication factors to default to use the broker's default > replication factor for new topics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9931) Kafka Connect should accept '-1' as a valid replication factor
[ https://issues.apache.org/jira/browse/KAFKA-9931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094975#comment-17094975 ] Randall Hauch edited comment on KAFKA-9931 at 4/29/20, 1:51 AM: We'll need a KIP for this change, but it should be relatively straightforward if we don't change the default or the config files, and merely accept values that are either 1 or >= 1. However, I do think this is a good time for Connect's worker logic when creating internal topics to support other topic settings via worker config properties using the `config.storage.topic.*`, `offset.storage.topic.*`, and `status.storage.topic.*` prefixes. was (Author: rhauch): We'll need a KIP for this change, but it should be relatively straightforward if we don't change the default or the config files, and merely accept values that are either -1 or >= 1. > Kafka Connect should accept '-1' as a valid replication factor > -- > > Key: KAFKA-9931 > URL: https://issues.apache.org/jira/browse/KAFKA-9931 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Labels: needs-kip > Fix For: 2.6.0 > > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic] > As of KIP-464, the adminclient can use '-1' as the replication factor or > partitions and the broker defaults. The Kafka Connect Frame work does not > currently accept anything less than 1 as a valid replication factor. This > should be changed so that Connect worker configurations can specify `-1` for > the internal topic replication factors to default to use the broker's default > replication factor for new topics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9931) Kafka Connect should accept '-1' as a valid replication factor
[ https://issues.apache.org/jira/browse/KAFKA-9931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094975#comment-17094975 ] Randall Hauch commented on KAFKA-9931: -- We'll need a KIP for this change, but it should be relatively straightforward if we don't change the default or the config files, and merely accept values that are either -1 or >= 1. > Kafka Connect should accept '-1' as a valid replication factor > -- > > Key: KAFKA-9931 > URL: https://issues.apache.org/jira/browse/KAFKA-9931 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Labels: needs-kip > Fix For: 2.6.0 > > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic] > As of KIP-464, the adminclient can use '-1' as the replication factor or > partitions and the broker defaults. The Kafka Connect Frame work does not > currently accept anything less than 1 as a valid replication factor. This > should be changed so that Connect worker configurations can specify `-1` for > the internal topic replication factors to default to use the broker's default > replication factor for new topics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9931) Kafka Connect should accept '-1' as a valid replication factor
[ https://issues.apache.org/jira/browse/KAFKA-9931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-9931: - Labels: needs-kip (was: ) > Kafka Connect should accept '-1' as a valid replication factor > -- > > Key: KAFKA-9931 > URL: https://issues.apache.org/jira/browse/KAFKA-9931 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Labels: needs-kip > Fix For: 2.6.0 > > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic] > As of KIP-464, the adminclient can use '-1' as the replication factor or > partitions and the broker defaults. The Kafka Connect Frame work does not > currently accept anything less than 1 as a valid replication factor. This > should be changed so that Connect worker configurations can specify `-1` for > the internal topic replication factors to default to use the broker's default > replication factor for new topics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9931) Kafka Connect should accept '-1' as a valid replication factor
Randall Hauch created KAFKA-9931: Summary: Kafka Connect should accept '-1' as a valid replication factor Key: KAFKA-9931 URL: https://issues.apache.org/jira/browse/KAFKA-9931 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 2.5.0 Reporter: Randall Hauch Assignee: Randall Hauch Fix For: 2.6.0 [https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic] As of KIP-464, the adminclient can use '-1' as the replication factor or partitions and the broker defaults. The Kafka Connect Frame work does not currently accept anything less than 1 as a valid replication factor. This should be changed so that Connect worker configurations can specify `-1` for the internal topic replication factors to default to use the broker's default replication factor for new topics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] efeg opened a new pull request #8579: KAFKA-9930: Prevent ReplicaFetcherThread from throwing UnknownTopicOrPartitionException upon topic creation and deletion.
efeg opened a new pull request #8579: URL: https://github.com/apache/kafka/pull/8579 When does UnknownTopicOrPartitionException typically occur? * Upon a topic creation, a follower broker of a new partition starts replica fetcher before the prospective leader broker of the new partition receives the leadership information from the controller (see [KAFKA-6221](https://issues.apache.org/jira/browse/KAFKA-6221)). * Upon a topic deletion, a follower broker of a to-be-deleted partition starts replica fetcher after the leader broker of the to-be-deleted partition processes the deletion information from the controller. * As expected, clusters with frequent topic creation and deletion report UnknownTopicOrPartitionException with relatively higher frequency. What is the impact? * Exception tracking systems identify the error logs with UnknownTopicOrPartitionException as an exception. This results in a lot of noise for a transient issue that is expected to recover by itself and a natural process in Kafka due to its asynchronous state propagation. Why not move it to a lower than warn-level log? * Despite typically being a transient issue, UnknownTopicOrPartitionException may also indicate real issues if it doesn't fix itself after a short period of time. To ensure detection of such scenarios, we set the log level to warn. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9930) Prevent ReplicaFetcherThread from throwing UnknownTopicOrPartitionException upon topic creation and deletion.
Adem Efe Gencer created KAFKA-9930: -- Summary: Prevent ReplicaFetcherThread from throwing UnknownTopicOrPartitionException upon topic creation and deletion. Key: KAFKA-9930 URL: https://issues.apache.org/jira/browse/KAFKA-9930 Project: Kafka Issue Type: Bug Components: logging Affects Versions: 2.5.0, 2.4.0, 2.3.0, 2.2.0, 2.1.0, 2.0.0, 1.1.0, 1.0.0, 0.11.0.0, 0.10.0.0 Reporter: Adem Efe Gencer Assignee: Adem Efe Gencer When does UnknownTopicOrPartitionException typically occur? * Upon a topic creation, a follower broker of a new partition starts replica fetcher before the prospective leader broker of the new partition receives the leadership information from the controller. Apache Kafka has a an open issue about this (see KAFKA-6221) * Upon a topic deletion, a follower broker of a to-be-deleted partition starts replica fetcher after the leader broker of the to-be-deleted partition processes the deletion information from the controller. * As expected, clusters with frequent topic creation and deletion report UnknownTopicOrPartitionException with relatively higher frequency. What is the impact? * Exception tracking systems identify the error logs with UnknownTopicOrPartitionException as an exception. This results in a lot of noise for a transient issue that is expected to recover by itself and a natural process in Kafka due to its asynchronous state propagation. Why not move it to a lower than warn-level log? * Despite typically being a transient issue, UnknownTopicOrPartitionException may also indicate real issues if it doesn't fix itself after a short period of time. To ensure detection of such scenarios, we set the log level to warn. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient
guozhangwang commented on a change in pull request #8578: URL: https://github.com/apache/kafka/pull/8578#discussion_r417015562 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java ## @@ -215,13 +216,13 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store } private Properties streamsConfiguration() { -final String applicationId = "streamsApp"; +final String safeTestName = safeUniqueTestName(getClass(), testName); final Properties config = new Properties(); config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); -config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId + name.getMethodName()); +config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port)); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); -config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath()); +config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); Review comment: Is it safer to encode the appID as part of the dir path to avoid collision? ## File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ## @@ -145,12 +162,12 @@ public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, fina } } -public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) { -driver.cleanUp(); +public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) { try { +driver.cleanUp(); cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT); -} catch (final InterruptedException e) { -throw new RuntimeException(e); +} catch (final RuntimeException | InterruptedException e) { +LOG.warn("Ignoring failure to clean test state", e); } Review comment: req: Actually deleting topics after test is critical for some tests: I've encountered some cases where the same topics are reused mistakenly across different test cases within the single class. But I feel that it is better to put the topic deletion in the `@after` function while leaving `cleanUp()` as part of the test function itself. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java ## @@ -156,7 +157,7 @@ public void shouldPreservePartitionTimeOnKafkaStreamRestart() { assertThat(lastRecordedTimestamp, is(5000L)); } finally { kafkaStreams.close(); -cleanStateAfterTest(CLUSTER, kafkaStreams); +quietlyCleanStateAfterTest(CLUSTER, kafkaStreams); Review comment: nit: we can put kafkaStreams in a try block. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java ## @@ -106,7 +108,7 @@ public void before() { consumerConfiguration = new Properties(); consumerConfiguration.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); -consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, name.getMethodName() + "-consumer"); +consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, safeTestName + "-consumer"); Review comment: Somewhere else it is set as `"group-" + safeTestName`, is this change intentional? ## File path: streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java ## @@ -243,7 +251,7 @@ public void shouldRecoverBufferAfterShutdown() { } finally { driver.close(); -cleanStateAfterTest(CLUSTER, driver); +quietlyCleanStateAfterTest(CLUSTER, driver); Review comment: nit: ditto here, we can put `driver` in the try block. And ditto elsewhere. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gwenshap commented on pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test
gwenshap commented on pull request #8518: URL: https://github.com/apache/kafka/pull/8518#issuecomment-620932278 No failures, nice :) Great update @lbradstreet and thanks for contributing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bseenu commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0
bseenu commented on a change in pull request #7577: URL: https://github.com/apache/kafka/pull/7577#discussion_r417002126 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java ## @@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) { Checkpoint.unwrapGroup(record.sourcePartition()), System.currentTimeMillis() - record.timestamp()); } + +private void refreshIdleConsumerGroupOffset() { +Map> consumerGroupsDesc = targetAdminClient +.describeConsumerGroups(consumerGroups).describedGroups(); + +for (String group : consumerGroups) { +try { +if (consumerGroupsDesc.get(group) == null) { +// if consumerGroupsDesc does not contain this group, it should be the new consumer +// group created at source cluster and its offsets should be sync-ed to target +newConsumerGroup.add(group); +continue; +} +ConsumerGroupDescription consumerGroupDesc = consumerGroupsDesc.get(group).get(); +// sync offset to the target cluster only if the state of current consumer group is idle or dead +ConsumerGroupState consumerGroupState = consumerGroupDesc.state(); +if (consumerGroupState.equals(ConsumerGroupState.EMPTY) || consumerGroupState.equals(ConsumerGroupState.DEAD)) { +idleConsumerGroupsOffset.put(group, targetAdminClient.listConsumerGroupOffsets(group) +.partitionsToOffsetAndMetadata().get().entrySet()); +} +} catch (InterruptedException | ExecutionException e) { +log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, e); +} +} +} + +Map> syncGroupOffset() { +Map> offsetToSyncAll = new HashMap<>(); + +// first, sync offsets for the idle consumers at target +for (Map.Entry>> group : idleConsumerGroupsOffset.entrySet()) { +String consumerGroupId = group.getKey(); +// for each idle consumer at target, read the checkpoints (converted upstream offset) +// from the pre-populated map +Map convertedUpstreamOffset = getConvertedUpstreamOffset(consumerGroupId); + +if (convertedUpstreamOffset == null) continue; + +Map offsetToSync = new HashMap<>(); +for (Entry entry : group.getValue()) { Review comment: Yes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0
ning2008wisc commented on a change in pull request #7577: URL: https://github.com/apache/kafka/pull/7577#discussion_r416995009 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java ## @@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) { Checkpoint.unwrapGroup(record.sourcePartition()), System.currentTimeMillis() - record.timestamp()); } + +private void refreshIdleConsumerGroupOffset() { +Map> consumerGroupsDesc = targetAdminClient +.describeConsumerGroups(consumerGroups).describedGroups(); + +for (String group : consumerGroups) { +try { +if (consumerGroupsDesc.get(group) == null) { +// if consumerGroupsDesc does not contain this group, it should be the new consumer +// group created at source cluster and its offsets should be sync-ed to target +newConsumerGroup.add(group); +continue; +} +ConsumerGroupDescription consumerGroupDesc = consumerGroupsDesc.get(group).get(); +// sync offset to the target cluster only if the state of current consumer group is idle or dead +ConsumerGroupState consumerGroupState = consumerGroupDesc.state(); +if (consumerGroupState.equals(ConsumerGroupState.EMPTY) || consumerGroupState.equals(ConsumerGroupState.DEAD)) { +idleConsumerGroupsOffset.put(group, targetAdminClient.listConsumerGroupOffsets(group) +.partitionsToOffsetAndMetadata().get().entrySet()); +} +} catch (InterruptedException | ExecutionException e) { +log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, e); +} +} +} + +Map> syncGroupOffset() { +Map> offsetToSyncAll = new HashMap<>(); + +// first, sync offsets for the idle consumers at target +for (Map.Entry>> group : idleConsumerGroupsOffset.entrySet()) { +String consumerGroupId = group.getKey(); +// for each idle consumer at target, read the checkpoints (converted upstream offset) +// from the pre-populated map +Map convertedUpstreamOffset = getConvertedUpstreamOffset(consumerGroupId); + +if (convertedUpstreamOffset == null) continue; + +Map offsetToSync = new HashMap<>(); +for (Entry entry : group.getValue()) { Review comment: If I am understanding right, are you asking about this scenario: consumer A is consuming from Topic `x` and ` y` and MM is replicating the offset of consumer A for Topic `x` and `y`. What if consumer A starts consume from Topic `x`, `y` and `z` where `z` is a new topic, why MM does not replicate the offset of consumer A for Topic `z`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…
zhaohaidao commented on pull request #8550: URL: https://github.com/apache/kafka/pull/8550#issuecomment-620915590 @abbccdda Hi, pr updated. Could you continue to review it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on pull request #8540: URL: https://github.com/apache/kafka/pull/8540#issuecomment-620909428 One unrelated failure: `MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9929) Support reverse iterator on WindowStore
Jorge Esteban Quilcate Otoya created KAFKA-9929: --- Summary: Support reverse iterator on WindowStore Key: KAFKA-9929 URL: https://issues.apache.org/jira/browse/KAFKA-9929 Project: Kafka Issue Type: Improvement Components: streams Reporter: Jorge Esteban Quilcate Otoya Currently, WindowStore fetch operations return an iterator sorted from earliest to latest result: ``` * For each key, the iterator guarantees ordering of windows, starting from the oldest/earliest * available window to the newest/latest window. ``` We have a use-case where traces are stored in a WindowStore and use Kafka Streams to create a materialized view of traces. A query request comes with a time range (e.g. now-1h, now) and want to return the most recent results, i.e. fetch from this period of time, iterate and pattern match latest/most recent traces, and if enough results, then reply without moving further on the iterator. Same store is used to search for previous traces. In this case, it search a key for the last day, if found traces, we would also like to iterate from the most recent. RocksDb seems to support iterating backward and forward: [https://github.com/facebook/rocksdb/wiki/Iterator#iterating-upper-bound-and-lower-bound] For reference: This in some way extracts some bits from this previous issue: https://issues.apache.org/jira/browse/KAFKA-4212: > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. But > this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. Would like to know if there is any impediment on RocksDb or WindowStore to support this. Adding an argument to reverse in current fetch methods would be great: ``` WindowStore.fetch(from,to,Direction.BACKWARD|FORWARD) ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient
vvcephei commented on a change in pull request #8578: URL: https://github.com/apache/kafka/pull/8578#discussion_r416969713 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java ## @@ -101,8 +102,8 @@ public void before() throws Exception { builder = new StreamsBuilder(); createTopics(); streamsConfiguration = new Properties(); -final String applicationId = "global-thread-shutdown-test" + testName.getMethodName(); -streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); +final String safeTestName = safeUniqueTestName(getClass(), testName); +streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); Review comment: I've standardized all the usages to be just "app", followed by the generated name, since the generated name contains the same information that we previously hand-wrote into the prefix or suffix. All we really need to do is ensure that the app id won't collide with a group name that we might use in a verification consumer, for example. For that reason, I've never used the generated name "plain", but always scoped it to the usage (app id, group id, input topic, etc.). It's not super important to apply these ideas universally, but I felt it would make it easier to write more tests like it in the future if I just made a full pass on all the tests to make them all look the same. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java ## @@ -88,16 +89,17 @@ private String stateStoreName; @Rule -public TestName name = new TestName(); +public TestName testName = new TestName(); @Before public void before() { -inputTopicName = "input-topic-" + name.getMethodName(); -outputTopicName = "output-topic-" + name.getMethodName(); -stateStoreName = "lagfetch-test-store" + name.getMethodName(); +final String safeTestName = safeUniqueTestName(getClass(), testName); +inputTopicName = "input-topic-" + safeTestName; +outputTopicName = "output-topic-" + safeTestName; +stateStoreName = "lagfetch-test-store" + safeTestName; streamsConfiguration = new Properties(); -streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "lag-fetch-" + name.getMethodName()); +streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "lag-fetch-" + safeTestName); Review comment: ```suggestion streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); ``` ## File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ## @@ -145,12 +162,12 @@ public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, fina } } -public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) { -driver.cleanUp(); +public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) { try { +driver.cleanUp(); cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT); -} catch (final InterruptedException e) { -throw new RuntimeException(e); +} catch (final RuntimeException | InterruptedException e) { +LOG.warn("Ignoring failure to clean test state", e); } Review comment: This is really the fix for KAFKA-9875. The other change just hopefully reduces the probability that ignoring the exceptions could cause subsequent failures (e.g., if the topics don't get deleted before the next test, at least the next one will have different topic names). I've verified that all usages of this method are ok to ignore potential exceptions. Namely, as long as the test logic itself doesn't want to ensure that any topics got deleted, and as long as this method is the last line in the method, then it should be fine just to ignore failures here. I also considered just deleting the method, but if it does succeed, then it leaves less garbage around for subsequent tests, so it feels better to at least attempt a cleanup. ## File path: streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java ## @@ -106,7 +106,9 @@ public static void startKafkaStreamsAndWaitForRunningState(final KafkaStreams ka kafkaStreams.start(); assertThat( "KafkaStreams did not transit to RUNNING state within " + timeoutMs + " milli seconds.", -countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS), equalTo(true)); +countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS), +equalTo(true) +); Review comment: just fixing the formatting.
[GitHub] [kafka] vvcephei opened a new pull request #8578: KAFKA-9875: Make integration tests more resilient
vvcephei opened a new pull request #8578: URL: https://github.com/apache/kafka/pull/8578 The ticket is for a flaky test that failed to clean up topics _after_ the test, which isn't strictly necessary for test success. * alter the "clean up after test" method to never throw an exception (after verifying it's always the last invocation inside a finally block, so it won't break any test semantics) * consolidated the naming of all integration tests' app ids, topics, etc., by introducing a new test utility to generate safe, unique, descriptive names. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] steverod commented on pull request #8542: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…
steverod commented on pull request #8542: URL: https://github.com/apache/kafka/pull/8542#issuecomment-620893016 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #8569: KIP-551: Expose disk read and write metrics
cmccabe commented on a change in pull request #8569: URL: https://github.com/apache/kafka/pull/8569#discussion_r416961523 ## File path: core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala ## @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.nio.file.{Files, Paths} + +import org.apache.kafka.common.utils.Time +import org.slf4j.Logger + +import scala.jdk.CollectionConverters._ + +/** + * Retrieves Linux /proc/self/io metrics. + */ +class LinuxIoMetricsCollector(val procPath: String, val time: Time, val logger: Logger) { + import LinuxIoMetricsCollector._ + var lastUpdateMs = -1L + var cachedReadBytes = 0L + var cachedWriteBytes = 0L + + def readBytes(): Long = this.synchronized { +val curMs = time.milliseconds() +if (curMs != lastUpdateMs) { + updateValues(curMs) +} +cachedReadBytes + } + + def writeBytes(): Long = this.synchronized { +val curMs = time.milliseconds() +if (curMs != lastUpdateMs) { + updateValues(curMs) +} +cachedWriteBytes + } + + /** + * Read /proc/self/io. + * + * Generally, each line in this file contains a prefix followed by a colon and a number. + * + * For example, it might contain this: + * rchar: 4052 + * wchar: 0 + * syscr: 13 + * syscw: 0 + * read_bytes: 0 + * write_bytes: 0 + * cancelled_write_bytes: 0 + */ + def updateValues(now: Long): Boolean = this.synchronized { Review comment: Unless we choose to read this file in a background thread, there isn't a reason to avoid using a lock here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8567: KAFKA-9652: Fix throttle metric in RequestChannel and request log due to KIP-219
ijuma commented on pull request #8567: URL: https://github.com/apache/kafka/pull/8567#issuecomment-620887867 2 jobs passed, 1 unrelated flaky test failed: > org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates
[ https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094900#comment-17094900 ] Georgi Petkov edited comment on KAFKA-9921 at 4/28/20, 10:23 PM: - [~ableegoldman] Yeah, I agree that probably not much can be done in terms of caching (compared to the options without _retainDuplicates_). I totally agree that many of the features like the null value behavior are correct and make perfect sense from point of view of the features implemented with it. Still, it's strange from the perspective where you use it standalone. *1-2 sentences clarifying the behavior with null values in the _WindowStateStore_ documentation could definitely help.* In addition, as I said if this is the desired behavior *you can easily skip calling RocksDB for null values (when using _retainDuplicates)_. This would both make the intention clearer and obviously avoid unnecessary calls.* I do need exactly stream-stream join but without the repartition part. I want to get matches when there are new events in whichever stream, support duplicate keys in the stream and I also use _WindowStateStore_ only for the retention policy. In fact, due to the lack of many examples, I was looking at the stream-stream join implementation to find out how to correctly use the _WindowStateStores_. I'm building a library for some common yet not trivial at all operations on streams that you may need like topological sorting. Therefore I don't know if the user will provide null values or not. I was curious about the behavior with null values so I know what I'm providing to the user. I've tested it and that's how I found out what is the exact behavior. *I'm not sure that an in-memory or any custom state store will make it.* Yes, in-memory will help with the efficient append because it avoids any expensive call and serializations/deserializations. Nevertheless, *you will always have the serializations/deserializations somewhere and this is the changelog topic and there you have also bandwidth* (not just precious processing time). Even if the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - O(n^2). Combined with the fact that I want to provide a library to many different users (and duplicates count may vary a lot between usages) *to me it's best to implement just as in the stream-stream join - with duplicates*. Still, it was a great discussion and made me more confident in my decisions. Thank you for your assistance. *Regarding the PR - it adds the same code to both _WindowStoreBuilder.java_ and _TimestampedWindowStoreBuilder.java_ but adds a test for only one of them.* was (Author: georgi.petkov): [~ableegoldman] Yeah, I agree that probably not much can be done in terms of caching (compared to the options without _retainDuplicates_). I totally agree that many of the features like the null value behavior are correct and make perfect sense from point of view of the features implemented with it. Still, it's strange from the perspective where you use it standalone. *1-2 sentences clarifying the behavior with null values in the _WindowStateStore_ documentation could definitely help.* In addition, as I said if this is the desired behavior *you can easily skip calling RocksDB for null values (when using _retainDuplicates)_. This would make both the intention clearer and obviously avoid unnecessary calls.* I do need exactly stream-stream join but without the repartition part. I want to get matches when there are new events in whichever stream and I also use _WindowStateStore_ only for the retention policy. In fact, due to the lack of many examples, I was looking at the stream-stream join implementation to find out how to correctly use the _WindowStateStores_. I'm building a library for some common yet not trivial at all operations on streams that you may need like topological sorting. Therefore I don't know if the user will provide null values or not. I was curious about the behavior with null values so I know what I'm providing to the user. I've tested it and that's how I found out what is the exact behavior. *I'm not sure that an in-memory or any custom state store will make it.* Yes, in-memory will help with the efficient append because it avoids any expensive call and serializations/deserializations. Nevertheless, *you will always have the serializations/deserializations somewhere and this is the changelog topic and there you have also bandwidth* (not just precious processing time). Even if the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - O(n^2). Combined with the fact that I want to provide a library to many different users (and duplicates count may vary a lot
[jira] [Commented] (KAFKA-9928) Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]
[ https://issues.apache.org/jira/browse/KAFKA-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094902#comment-17094902 ] Guozhang Wang commented on KAFKA-9928: -- I found that for the failed run, around the time when the producer of {{produceTopicValues(streamTopic);}} around line 172 is being closed, the following entries are printed (whereas succeeded runs do not have those), cc [~mjsax]: {code} [2020-04-28 15:10:58,458] INFO [Consumer clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer, groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_] Fetch offset 9 is out of range for partition stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:1261) [2020-04-28 15:10:58,458] INFO [Consumer clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer, groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_] Resetting offset for partition stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState:383) [2020-04-28 15:10:58,459] INFO [Consumer clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer, groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_] Fetch offset 9 is out of range for partition stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:1261) [2020-04-28 15:10:58,460] INFO [Consumer clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer, groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_] Resetting offset for partition stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState:383) [2020-04-28 15:10:58,461] INFO [Consumer clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer, groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_] Fetch offset 9 is out of range for partition stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:1261) [2020-04-28 15:10:58,461] INFO [Consumer clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer, groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_] Resetting offset for partition stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState:383) [2020-04-28 15:10:58,566] INFO [Producer clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-producer, transactionalId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-1] Discovered group coordinator localhost:54279 (id: 0 rack: null) (org.apache.kafka.clients.producer.internals.TransactionManager:1525) [2020-04-28 15:11:00,740] INFO [Controller id=0] Processing automatic preferred replica leader election (kafka.controller.KafkaController:66) {code} Note that this CLUSTER only have one broker. > Flaky > GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta] > - > > Key: KAFKA-9928 > URL: https://issues.apache.org/jira/browse/KAFKA-9928 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Matthias J. Sax >Priority: Major > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 3. waiting for > final values > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:380) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) > at
[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates
[ https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094900#comment-17094900 ] Georgi Petkov commented on KAFKA-9921: -- [~ableegoldman] Yeah, I agree that probably not much can be done in terms of caching (compared to the options without _retainDuplicates_). I totally agree that many of the features like the null value behavior are correct and make perfect sense from point of view of the features implemented with it. Still, it's strange from the perspective where you use it standalone. *1-2 sentences clarifying the behavior with null values in the _WindowStateStore_ documentation could definitely help.* In addition, as I said if this is the desired behavior *you can easily skip calling RocksDB for null values (when using _retainDuplicates)_. This would make both the intention clearer and obviously avoid unnecessary calls.* I do need exactly stream-stream join but without the repartition part. I want to get matches when there are new events in whichever stream and I also use _WindowStateStore_ only for the retention policy. In fact, due to the lack of many examples, I was looking at the stream-stream join implementation to find out how to correctly use the _WindowStateStores_. I'm building a library for some common yet not trivial at all operations on streams that you may need like topological sorting. Therefore I don't know if the user will provide null values or not. I was curious about the behavior with null values so I know what I'm providing to the user. I've tested it and that's how I found out what is the exact behavior. *I'm not sure that an in-memory or any custom state store will make it.* Yes, in-memory will help with the efficient append because it avoids any expensive call and serializations/deserializations. Nevertheless, *you will always have the serializations/deserializations somewhere and this is the changelog topic and there you have also bandwidth* (not just precious processing time). Even if the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - O(n^2). Combined with the fact that I want to provide a library to many different users (and duplicates count may vary a lot between usages) *to me it's best to implement just as in the stream-stream join - with duplicates*. Still, it was a great discussion and made me more confident in my decisions. Thank you for your assistance. *Regarding the PR - it adds the same code to both _WindowStoreBuilder.java_ and _TimestampedWindowStoreBuilder.java_ but adds a test for only one of them.* > Caching is not working properly with WindowStateStore when rataining > duplicates > --- > > Key: KAFKA-9921 > URL: https://issues.apache.org/jira/browse/KAFKA-9921 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Georgi Petkov >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0, 2.5.1 > > > I'm using the current latest version 2.5.0 but this is not something new. > I have _WindowStateStore_ configured as following (where _true_ stands for > the _retainDuplicates_ paramter): > _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, > retentionPeriod, windowSize, *true*), keySerde, > valueSerde)*.withCachingEnabled()*)_ > If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that > order when reading them through the iterator I'll get the values *4, 2, 3, 4*. > I've done a bit of investigation myself and the problem is that *the whole > caching feature is written without consideration of the case where duplicates > are retained*. > The observed behavior is due to having the last value in the cache (and it > can have only one since it's not aware of the retain duplicates option) and > it is read first (while skipping the first from the RocksDB iterator even > though the values are different). This can be observed (for version 2.5.0) in > _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 > values are read from the RocksDB iterator so they are as expected. > As I said, the whole feature is not considering the _retainDuplicates_ option > so there are other examples of incorrect behavior like in > _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you > would skip one duplicate entry in the RocksDB iterator for the given key. > In my use case, I want to persist a list of values for a given key without > increasing the complexity to linear for a single event (which would be the > case if I was always reading the current list appending one value and writing > it back). So I go for
[GitHub] [kafka] cmccabe edited a comment on pull request #8569: KIP-551: Expose disk read and write metrics
cmccabe edited a comment on pull request #8569: URL: https://github.com/apache/kafka/pull/8569#issuecomment-620884289 > In addition to block-level read/write, would there be a benefit to expose file system read/write metrics? It's better to have that discussion on the mailing list. This PR is just about KIP-551. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #8569: KIP-551: Expose disk read and write metrics
cmccabe commented on pull request #8569: URL: https://github.com/apache/kafka/pull/8569#issuecomment-620884289 > In addition to block-level read/write, would there be a benefit to expose file system read/write metrics? It's better to have that discussion on the mailing list. This PR is just about KIP-551. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8542: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…
junrao commented on pull request #8542: URL: https://github.com/apache/kafka/pull/8542#issuecomment-620880304 @steverod : Does the JDK 8 and Scala 2.12 tests pass for you locally? Not sure why the jenkins test failed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
guozhangwang commented on pull request #8568: URL: https://github.com/apache/kafka/pull/8568#issuecomment-620879654 Merged to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8543: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…
junrao commented on pull request #8543: URL: https://github.com/apache/kafka/pull/8543#issuecomment-620879490 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
vvcephei commented on a change in pull request #8568: URL: https://github.com/apache/kafka/pull/8568#discussion_r416947800 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ## @@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final * @param Key type of the data records * @param Value type of the data records */ -@SuppressWarnings("WeakerAccess") public static void produceKeyValuesSynchronouslyWithTimestamp(final String topic, final Collection> records, final Properties producerConfig, final Headers headers, final Long timestamp, - final boolean enableTransactions) -throws ExecutionException, InterruptedException { + final boolean enableTransactions) { try (final Producer producer = new KafkaProducer<>(producerConfig)) { if (enableTransactions) { producer.initTransactions(); producer.beginTransaction(); } for (final KeyValue record : records) { -final Future f = producer.send( -new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers)); -f.get(); +producer.send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers)); Review comment: Thanks. That's what I was asking for confirmation on. I realize now the structure of my sentence was ambiguous. I agree that the method contract is that the batch should be synchronously produced, not that each record should be synchronously produced, so this change looks good to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
guozhangwang commented on a change in pull request #8568: URL: https://github.com/apache/kafka/pull/8568#discussion_r416944392 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ## @@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final * @param Key type of the data records * @param Value type of the data records */ -@SuppressWarnings("WeakerAccess") public static void produceKeyValuesSynchronouslyWithTimestamp(final String topic, final Collection> records, final Properties producerConfig, final Headers headers, final Long timestamp, - final boolean enableTransactions) -throws ExecutionException, InterruptedException { + final boolean enableTransactions) { try (final Producer producer = new KafkaProducer<>(producerConfig)) { if (enableTransactions) { producer.initTransactions(); producer.beginTransaction(); } for (final KeyValue record : records) { -final Future f = producer.send( -new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers)); -f.get(); +producer.send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers)); Review comment: Previously we wait after sending each record, here we only wait once after sending all records, so it is more efficient. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
guozhangwang commented on a change in pull request #8568: URL: https://github.com/apache/kafka/pull/8568#discussion_r416943907 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java ## @@ -337,8 +336,11 @@ public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws TestUtils.waitForCondition( () -> { try { -final ReadOnlyKeyValueStore store = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); +final ReadOnlyKeyValueStore store = IntegrationTestUtils.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore()); + +if (store == null) +return false; Review comment: ack. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
guozhangwang commented on a change in pull request #8568: URL: https://github.com/apache/kafka/pull/8568#discussion_r416942992 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java ## @@ -810,21 +808,9 @@ private void writeInputData(final List> records) throws Exc } private void verifyStateStore(final KafkaStreams streams, - final Set> expectedStoreContent) { -ReadOnlyKeyValueStore store = null; - -final long maxWaitingTime = System.currentTimeMillis() + 30L; -while (System.currentTimeMillis() < maxWaitingTime) { -try { -store = streams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())); -break; -} catch (final InvalidStateStoreException okJustRetry) { -try { -Thread.sleep(5000L); -} catch (final Exception ignore) { } -} -} - + final Set> expectedStoreContent) throws InterruptedException { +final ReadOnlyKeyValueStore store = IntegrationTestUtils +.getStore(30L, storeName, streams, QueryableStoreTypes.keyValueStore()); Review comment: Ack. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fantayeneh opened a new pull request #8577: use appropriate fn for readability. (maybe)
fantayeneh opened a new pull request #8577: URL: https://github.com/apache/kafka/pull/8577 using the min, max might make the code a little easier to read. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9875) Flaky Test SuppressionDurabilityIntegrationTest.shouldRecoverBufferAfterShutdown[exactly_once]
[ https://issues.apache.org/jira/browse/KAFKA-9875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-9875: --- Assignee: John Roesler > Flaky Test > SuppressionDurabilityIntegrationTest.shouldRecoverBufferAfterShutdown[exactly_once] > -- > > Key: KAFKA-9875 > URL: https://issues.apache.org/jira/browse/KAFKA-9875 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.6.0 >Reporter: Sophie Blee-Goldman >Assignee: John Roesler >Priority: Major > Labels: flaky-test, unit-test > > h3. Stacktrace > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The request timed out. at > org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:211) > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteAllTopicsAndWait(EmbeddedKafkaCluster.java:300) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest(IntegrationTestUtils.java:148) > at > org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest.shouldRecoverBufferAfterShutdown(SuppressionDurabilityIntegrationTest.java:246) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
vvcephei commented on a change in pull request #8568: URL: https://github.com/apache/kafka/pull/8568#discussion_r416921971 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java ## @@ -810,21 +808,9 @@ private void writeInputData(final List> records) throws Exc } private void verifyStateStore(final KafkaStreams streams, - final Set> expectedStoreContent) { -ReadOnlyKeyValueStore store = null; - -final long maxWaitingTime = System.currentTimeMillis() + 30L; -while (System.currentTimeMillis() < maxWaitingTime) { -try { -store = streams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())); -break; -} catch (final InvalidStateStoreException okJustRetry) { -try { -Thread.sleep(5000L); -} catch (final Exception ignore) { } -} -} - + final Set> expectedStoreContent) throws InterruptedException { +final ReadOnlyKeyValueStore store = IntegrationTestUtils +.getStore(30L, storeName, streams, QueryableStoreTypes.keyValueStore()); Review comment: ```suggestion .getStore(300_000L, storeName, streams, QueryableStoreTypes.keyValueStore()); ``` ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java ## @@ -337,8 +336,11 @@ public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws TestUtils.waitForCondition( () -> { try { -final ReadOnlyKeyValueStore store = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); +final ReadOnlyKeyValueStore store = IntegrationTestUtils.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore()); + +if (store == null) +return false; Review comment: not a huge deal, but technically, these should have brackets. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ## @@ -599,13 +595,6 @@ public static void waitForCompletion(final KafkaStreams streams, return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, false); } -public static List> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig, Review comment: thanks for the cleanup ## File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ## @@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final * @param Key type of the data records * @param Value type of the data records */ -@SuppressWarnings("WeakerAccess") public static void produceKeyValuesSynchronouslyWithTimestamp(final String topic, final Collection> records, final Properties producerConfig, final Headers headers, final Long timestamp, - final boolean enableTransactions) -throws ExecutionException, InterruptedException { + final boolean enableTransactions) { try (final Producer producer = new KafkaProducer<>(producerConfig)) { if (enableTransactions) { producer.initTransactions(); producer.beginTransaction(); } for (final KeyValue record : records) { -final Future f = producer.send( -new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers)); -f.get(); +producer.send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers)); Review comment: I guess the flush at the end makes it synchronous anyway? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9928) Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]
[ https://issues.apache.org/jira/browse/KAFKA-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-9928: -- Assignee: Matthias J. Sax > Flaky > GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta] > - > > Key: KAFKA-9928 > URL: https://issues.apache.org/jira/browse/KAFKA-9928 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Matthias J. Sax >Priority: Major > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 3. waiting for > final values > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:380) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) > at > org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableEOSIntegrationTest.java:178) > {code} > I looked at the below examples: > https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_/ > https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_beta__2/ > https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta__2/ > And also reproduced the flakiness locally after about 180 runs, and the > failed one did not have any obvious different traces compared with the > successful ones. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9928) Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]
[ https://issues.apache.org/jira/browse/KAFKA-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9928: --- Component/s: unit tests streams > Flaky > GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta] > - > > Key: KAFKA-9928 > URL: https://issues.apache.org/jira/browse/KAFKA-9928 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Major > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 3. waiting for > final values > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:380) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) > at > org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableEOSIntegrationTest.java:178) > {code} > I looked at the below examples: > https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_/ > https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_beta__2/ > https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta__2/ > And also reproduced the flakiness locally after about 180 runs, and the > failed one did not have any obvious different traces compared with the > successful ones. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] fantayeneh opened a new pull request #8576: format with correct syntax
fantayeneh opened a new pull request #8576: URL: https://github.com/apache/kafka/pull/8576 small change fix string formatting ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r416916295 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java ## @@ -41,132 +54,107 @@ import static org.easymock.EasyMock.replay; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; -import org.easymock.EasyMock; -import org.junit.Test; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; public class HighAvailabilityTaskAssignorTest { -private long acceptableRecoveryLag = 100L; -private int balanceFactor = 1; -private int maxWarmupReplicas = 2; -private int numStandbyReplicas = 0; -private long probingRebalanceInterval = 60 * 1000L; - -private Map clientStates = new HashMap<>(); -private Set allTasks = new HashSet<>(); -private Set statefulTasks = new HashSet<>(); - -private ClientState client1; -private ClientState client2; -private ClientState client3; - -private HighAvailabilityTaskAssignor taskAssignor; - -private void createTaskAssignor() { -final AssignmentConfigs configs = new AssignmentConfigs( -acceptableRecoveryLag, -balanceFactor, -maxWarmupReplicas, -numStandbyReplicas, -probingRebalanceInterval -); -taskAssignor = new HighAvailabilityTaskAssignor( -clientStates, -allTasks, -statefulTasks, -configs); -} +private final AssignmentConfigs configWithoutStandbys = new AssignmentConfigs( +/*acceptableRecoveryLag*/ 100L, +/*balanceFactor*/ 1, +/*maxWarmupReplicas*/ 2, +/*numStandbyReplicas*/ 0, +/*probingRebalanceIntervalMs*/ 60 * 1000L +); + +private final AssignmentConfigs configWithStandbys = new AssignmentConfigs( +/*acceptableRecoveryLag*/ 100L, +/*balanceFactor*/ 1, +/*maxWarmupReplicas*/ 2, +/*numStandbyReplicas*/ 1, +/*probingRebalanceIntervalMs*/ 60 * 1000L +); -@Test -public void shouldDecidePreviousAssignmentIsInvalidIfThereAreUnassignedActiveTasks() { -client1 = EasyMock.createNiceMock(ClientState.class); -expect(client1.prevActiveTasks()).andReturn(singleton(TASK_0_0)); -expect(client1.prevStandbyTasks()).andStubReturn(EMPTY_TASKS); -replay(client1); -allTasks = mkSet(TASK_0_0, TASK_0_1); -clientStates = singletonMap(UUID_1, client1); -createTaskAssignor(); -assertFalse(taskAssignor.previousAssignmentIsValid()); Review comment: Since you have a follow-on PR that touches this method, I'll leave it alone and just proceed to merge. We should consider both of these options in the follow-on. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
guozhangwang commented on pull request #8568: URL: https://github.com/apache/kafka/pull/8568#issuecomment-620847306 I looked at the three failed tests: * `OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore` is actually due to the issue that https://github.com/apache/kafka/pull/8548 tried to fix. Waiting for @vvcephei to review 8548 * `EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]` is being looked at by @mjsax as KAFKA-9831 * `GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]` is a new issue, I created KAFKA-9928 for this, and my gut feeling is that it has the same root cause as KAFKA-9831. (also cc @mjsax ) So I think this PR is good to be merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8254: KIP-557: Add Emit On Change Support
vvcephei commented on pull request #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-620847005 Whew! System tests passed: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-04-28--001.1588064884--ConcurrencyPractitioner--EMIT-ON-CHANGE--ddbf2cf/report.html This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
vvcephei commented on pull request #8540: URL: https://github.com/apache/kafka/pull/8540#issuecomment-620846462 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
vvcephei commented on pull request #8540: URL: https://github.com/apache/kafka/pull/8540#issuecomment-620846265 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
guozhangwang commented on a change in pull request #8568: URL: https://github.com/apache/kafka/pull/8568#discussion_r416910725 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java ## @@ -158,8 +158,9 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { produceGlobalTableValues(); -final ReadOnlyKeyValueStore replicatedStore = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); +final ReadOnlyKeyValueStore replicatedStore = IntegrationTestUtils +.getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); +assertNotNull(replicatedStore); Review comment: Since previously we would just throw the exception with the un-wrapped call, here asserting it is not null is equal to make sure that the store is indeed returned. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9928) Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]
Guozhang Wang created KAFKA-9928: Summary: Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta] Key: KAFKA-9928 URL: https://issues.apache.org/jira/browse/KAFKA-9928 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang {code} Stacktrace java.lang.AssertionError: Condition not met within timeout 3. waiting for final values at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:380) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableEOSIntegrationTest.java:178) {code} I looked at the below examples: https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_/ https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_beta__2/ https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta__2/ And also reproduced the flakiness locally after about 180 runs, and the failed one did not have any obvious different traces compared with the successful ones. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates
[ https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-9921: --- Fix Version/s: 2.5.1 > Caching is not working properly with WindowStateStore when rataining > duplicates > --- > > Key: KAFKA-9921 > URL: https://issues.apache.org/jira/browse/KAFKA-9921 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Georgi Petkov >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0, 2.5.1 > > > I'm using the current latest version 2.5.0 but this is not something new. > I have _WindowStateStore_ configured as following (where _true_ stands for > the _retainDuplicates_ paramter): > _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, > retentionPeriod, windowSize, *true*), keySerde, > valueSerde)*.withCachingEnabled()*)_ > If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that > order when reading them through the iterator I'll get the values *4, 2, 3, 4*. > I've done a bit of investigation myself and the problem is that *the whole > caching feature is written without consideration of the case where duplicates > are retained*. > The observed behavior is due to having the last value in the cache (and it > can have only one since it's not aware of the retain duplicates option) and > it is read first (while skipping the first from the RocksDB iterator even > though the values are different). This can be observed (for version 2.5.0) in > _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 > values are read from the RocksDB iterator so they are as expected. > As I said, the whole feature is not considering the _retainDuplicates_ option > so there are other examples of incorrect behavior like in > _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you > would skip one duplicate entry in the RocksDB iterator for the given key. > In my use case, I want to persist a list of values for a given key without > increasing the complexity to linear for a single event (which would be the > case if I was always reading the current list appending one value and writing > it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use > _#transformValues_ and state stores. > So as an impact I can't use caching on my state stores. For others - they'll > have incorrect behavior that may take a lot of time to be discovered and even > more time to fix the results. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] gwenshap commented on pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test
gwenshap commented on pull request #8518: URL: https://github.com/apache/kafka/pull/8518#issuecomment-620826970 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates
[ https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094844#comment-17094844 ] Sophie Blee-Goldman commented on KAFKA-9921: I'm resolving the ticket because the PR to disable caching + duplicates and note this in the javadocs was just merged. If you have the chance to take a quick look and let me know if there's anything I missed clarifying in the docs, I can submit a quick followup PR or review one from you if you have something specific in mind > Caching is not working properly with WindowStateStore when rataining > duplicates > --- > > Key: KAFKA-9921 > URL: https://issues.apache.org/jira/browse/KAFKA-9921 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Georgi Petkov >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0 > > > I'm using the current latest version 2.5.0 but this is not something new. > I have _WindowStateStore_ configured as following (where _true_ stands for > the _retainDuplicates_ paramter): > _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, > retentionPeriod, windowSize, *true*), keySerde, > valueSerde)*.withCachingEnabled()*)_ > If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that > order when reading them through the iterator I'll get the values *4, 2, 3, 4*. > I've done a bit of investigation myself and the problem is that *the whole > caching feature is written without consideration of the case where duplicates > are retained*. > The observed behavior is due to having the last value in the cache (and it > can have only one since it's not aware of the retain duplicates option) and > it is read first (while skipping the first from the RocksDB iterator even > though the values are different). This can be observed (for version 2.5.0) in > _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 > values are read from the RocksDB iterator so they are as expected. > As I said, the whole feature is not considering the _retainDuplicates_ option > so there are other examples of incorrect behavior like in > _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you > would skip one duplicate entry in the RocksDB iterator for the given key. > In my use case, I want to persist a list of values for a given key without > increasing the complexity to linear for a single event (which would be the > case if I was always reading the current list appending one value and writing > it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use > _#transformValues_ and state stores. > So as an impact I can't use caching on my state stores. For others - they'll > have incorrect behavior that may take a lot of time to be discovered and even > more time to fix the results. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates
[ https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094839#comment-17094839 ] Sophie Blee-Goldman commented on KAFKA-9921: I take it you're using rocksdb, by the way? If you are (or can) use the in-memory stores then storing a list and appending should be pretty fast. On that note, I'm actually not sure storing the entire list would be slower than storing individual duplicate records even with rocskdb. I actually have a suspicious that it might even be faster to store as a list, assuming the number and size of duplicates isn't incredibly large (relative to the memtable and block size scale) > Caching is not working properly with WindowStateStore when rataining > duplicates > --- > > Key: KAFKA-9921 > URL: https://issues.apache.org/jira/browse/KAFKA-9921 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Georgi Petkov >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0 > > > I'm using the current latest version 2.5.0 but this is not something new. > I have _WindowStateStore_ configured as following (where _true_ stands for > the _retainDuplicates_ paramter): > _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, > retentionPeriod, windowSize, *true*), keySerde, > valueSerde)*.withCachingEnabled()*)_ > If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that > order when reading them through the iterator I'll get the values *4, 2, 3, 4*. > I've done a bit of investigation myself and the problem is that *the whole > caching feature is written without consideration of the case where duplicates > are retained*. > The observed behavior is due to having the last value in the cache (and it > can have only one since it's not aware of the retain duplicates option) and > it is read first (while skipping the first from the RocksDB iterator even > though the values are different). This can be observed (for version 2.5.0) in > _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 > values are read from the RocksDB iterator so they are as expected. > As I said, the whole feature is not considering the _retainDuplicates_ option > so there are other examples of incorrect behavior like in > _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you > would skip one duplicate entry in the RocksDB iterator for the given key. > In my use case, I want to persist a list of values for a given key without > increasing the complexity to linear for a single event (which would be the > case if I was always reading the current list appending one value and writing > it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use > _#transformValues_ and state stores. > So as an impact I can't use caching on my state stores. For others - they'll > have incorrect behavior that may take a lot of time to be discovered and even > more time to fix the results. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates
[ https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094835#comment-17094835 ] Sophie Blee-Goldman commented on KAFKA-9921: > For 2 puts I would expect 2 entries regardless if they accidentally match Fair enough. I guess for that reason then caching and inherently incompatible, right? Regarding putting _null_ values, I think the behavior with _retainDuplicates_ is as expected. The Streams library uses window stores with duplicates for stream-stream joins, for which a null value produces no output and isn't considered a tombstone (see [semantics of stream-stream joins|https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#kstream-kstream-join] section). I'm starting to get a better sense of what you're trying to do here, but it sounds like the semantics you want might differ slightly from what Streams would consider a stream-stream join. Do you explicitly want a windowed join, or are you just using the window store because the retention policy will keep state from growing without bound? Does your use case require _null_ values to be treated as deletes? By the way, if the built-in stores don't match your requirements exactly you can always plug in a custom store. You could even just wrap one of the built-in stores to reuse the pieces that work for you, and skip the ones that don't. The rocksdb WindowStore is actually just built out of segments of the rocksdb KeyValueStore, for example. > Caching is not working properly with WindowStateStore when rataining > duplicates > --- > > Key: KAFKA-9921 > URL: https://issues.apache.org/jira/browse/KAFKA-9921 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Georgi Petkov >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0 > > > I'm using the current latest version 2.5.0 but this is not something new. > I have _WindowStateStore_ configured as following (where _true_ stands for > the _retainDuplicates_ paramter): > _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, > retentionPeriod, windowSize, *true*), keySerde, > valueSerde)*.withCachingEnabled()*)_ > If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that > order when reading them through the iterator I'll get the values *4, 2, 3, 4*. > I've done a bit of investigation myself and the problem is that *the whole > caching feature is written without consideration of the case where duplicates > are retained*. > The observed behavior is due to having the last value in the cache (and it > can have only one since it's not aware of the retain duplicates option) and > it is read first (while skipping the first from the RocksDB iterator even > though the values are different). This can be observed (for version 2.5.0) in > _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 > values are read from the RocksDB iterator so they are as expected. > As I said, the whole feature is not considering the _retainDuplicates_ option > so there are other examples of incorrect behavior like in > _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you > would skip one duplicate entry in the RocksDB iterator for the given key. > In my use case, I want to persist a list of values for a given key without > increasing the complexity to linear for a single event (which would be the > case if I was always reading the current list appending one value and writing > it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use > _#transformValues_ and state stores. > So as an impact I can't use caching on my state stores. For others - they'll > have incorrect behavior that may take a lot of time to be discovered and even > more time to fix the results. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on a change in pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test
abbccdda commented on a change in pull request #8518: URL: https://github.com/apache/kafka/pull/8518#discussion_r416879514 ## File path: tests/kafkatest/tests/core/downgrade_test.py ## @@ -67,11 +67,18 @@ def setup_services(self, kafka_version, compression_types, security_protocol): version=kafka_version) self.producer.start() +static_membership = kafka_version == DEV_BRANCH or kafka_version >= LATEST_2_3 Review comment: I see, makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #8204: KAFKA-9633: Ensure ConfigProviders are closed
kkonstantine commented on a change in pull request #8204: URL: https://github.com/apache/kafka/pull/8204#discussion_r416866064 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ## @@ -220,6 +220,8 @@ public void stop() { workerMetricsGroup.close(); connectorStatusMetricsGroup.close(); + +workerConfigTransformer.close(); Review comment: Looking at the initialization of `workerConfigTransformer` I see it should be made final. And then I notice that this is the case for `connectorClientConfigOverridePolicy` and all the class members of `ConnectorStatusMetricsGroup`. @tombentley do you mind tightening these types as well? ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java ## @@ -98,4 +101,8 @@ public void onCompletion(Throwable error, Void result) { HerderRequest request = worker.herder().restartConnector(ttl, connectorName, cb); connectorRequests.put(path, request); } + +public void close() { Review comment: should we also change this class to implement `AutoCloseable`? This can't be used immediately in a try-with-resources clause, but probably better to signal the existence of this method at the class level. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8204: KAFKA-9633: Ensure ConfigProviders are closed
kkonstantine commented on pull request #8204: URL: https://github.com/apache/kafka/pull/8204#issuecomment-620803829 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry
[ https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094805#comment-17094805 ] Guozhang Wang commented on KAFKA-9925: -- Ah yes!! Hope we can get KIP-591 by 2.6 :) > Non-key KTable Joining may result in duplicate schema name in confluence > schema registry > > > Key: KAFKA-9925 > URL: https://issues.apache.org/jira/browse/KAFKA-9925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1 >Reporter: Kin Siu >Assignee: John Roesler >Priority: Major > > The second half of issue Andy Bryant reported in KAFKA-9390 looks like still > exist. > When testing non-key join method without passing in "Named", I noticed that > there are schema subjects registered in confluent schema registry without > consumer group Id still, > e.g. > {noformat} > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value" > {noformat} > Code in KTableImpl which constructed above naming : > https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959 > When we have multiple topologies using foreignKey join and registered to same > schema registry, we can have a name clash, and fail to register schema. > In order to clean up these schema subjects, we will need to know the internal > naming of a consumer group's topology, which is not straightforward and error > prone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7317) Use collections subscription for main consumer to reduce metadata
[ https://issues.apache.org/jira/browse/KAFKA-7317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094800#comment-17094800 ] Matthias J. Sax commented on KAFKA-7317: Sweet! > Use collections subscription for main consumer to reduce metadata > - > > Key: KAFKA-7317 > URL: https://issues.apache.org/jira/browse/KAFKA-7317 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.5.0 > > > In KAFKA-4633 we switched from "collection subscription" to "pattern > subscription" for `Consumer#subscribe()` to avoid triggering auto topic > creating on the broker. In KAFKA-5291, the metadata request was extended to > overwrite the broker config within the request itself. However, this feature > is only used in `KafkaAdminClient`. KAFKA-7320 adds this feature for the > consumer client, too. > This ticket proposes to use the new feature within Kafka Streams to allow the > usage of collection based subscription in consumer and admit clients to > reduce the metadata response size than can be very large for large number of > partitions in the cluster. > Note, that Streams need to be able to distinguish if it connects to older > brokers that do not support the new metadata request and still use pattern > subscription for this case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9127) Needless group coordination overhead for GlobalKTables
[ https://issues.apache.org/jira/browse/KAFKA-9127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094801#comment-17094801 ] Sophie Blee-Goldman commented on KAFKA-9127: Yep, if you can kick off tests on that PR and give it another pass it should fix both issues and we can backport it to 2.5 > Needless group coordination overhead for GlobalKTables > -- > > Key: KAFKA-9127 > URL: https://issues.apache.org/jira/browse/KAFKA-9127 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Chris Toomey >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0 > > > When creating a simple stream topology to just populate a GlobalKTable, I > noticed from logging that the stream consumer was doing group coordination > requests (JoinGroup, SyncGroup, ...) to the server, which it had no reason to > do since the global consumer thread populating the table fetches from all > partitions and thus doesn't use the group requests. So this adds needless > overhead on the client, network, and server. > I tracked this down to the stream thread consumer, which is created > regardless of whether it's needed based solely on NUM_STREAM_THREADS_CONFIG > which defaults to 1 I guess. > I found that setting NUM_STREAM_THREADS_CONFIG to 0 will prevent this from > happening, but it'd be a worthwhile improvement to be able to override this > setting in cases of topologies like this that don't have any need for stream > threads. Hence this ticket. > I originally asked about this on the users mailing list where Bruno suggested > I file it as an improvement request. > Here's the Scala code that I'm using that exhibits this: > {code:scala} > val builder: StreamsBuilder = new StreamsBuilder() > val gTable = builder.globalTable[K, V](...) > val stream = new KafkaStreams(builder.build(), props) > stream.start(){code} > Not shown is the state store that I'm populating/using. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry
[ https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094799#comment-17094799 ] Matthias J. Sax commented on KAFKA-9925: {quote}I'm wondering if now is a good time to deprecate the `StreamsBuilder#build()` function to let users use `build(final Properties props)` instead as a tiny KIP. {quote} Just FYI: this is already proposed in KIP-591. However, IMHO, we should fix it for older versions, too? > Non-key KTable Joining may result in duplicate schema name in confluence > schema registry > > > Key: KAFKA-9925 > URL: https://issues.apache.org/jira/browse/KAFKA-9925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1 >Reporter: Kin Siu >Assignee: John Roesler >Priority: Major > > The second half of issue Andy Bryant reported in KAFKA-9390 looks like still > exist. > When testing non-key join method without passing in "Named", I noticed that > there are schema subjects registered in confluent schema registry without > consumer group Id still, > e.g. > {noformat} > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value" > {noformat} > Code in KTableImpl which constructed above naming : > https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959 > When we have multiple topologies using foreignKey join and registered to same > schema registry, we can have a name clash, and fail to register schema. > In order to clean up these schema subjects, we will need to know the internal > naming of a consumer group's topology, which is not straightforward and error > prone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on pull request #8204: Ensure ConfigProviders are closed
kkonstantine commented on pull request #8204: URL: https://github.com/apache/kafka/pull/8204#issuecomment-620795402 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7317) Use collections subscription for main consumer to reduce metadata
[ https://issues.apache.org/jira/browse/KAFKA-7317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094797#comment-17094797 ] Sophie Blee-Goldman commented on KAFKA-7317: Also fixed via [https://github.com/apache/kafka/pull/8540] > Use collections subscription for main consumer to reduce metadata > - > > Key: KAFKA-7317 > URL: https://issues.apache.org/jira/browse/KAFKA-7317 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.5.0 > > > In KAFKA-4633 we switched from "collection subscription" to "pattern > subscription" for `Consumer#subscribe()` to avoid triggering auto topic > creating on the broker. In KAFKA-5291, the metadata request was extended to > overwrite the broker config within the request itself. However, this feature > is only used in `KafkaAdminClient`. KAFKA-7320 adds this feature for the > consumer client, too. > This ticket proposes to use the new feature within Kafka Streams to allow the > usage of collection based subscription in consumer and admit clients to > reduce the metadata response size than can be very large for large number of > partitions in the cluster. > Note, that Streams need to be able to distinguish if it connects to older > brokers that do not support the new metadata request and still use pattern > subscription for this case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9127) Needless group coordination overhead for GlobalKTables
[ https://issues.apache.org/jira/browse/KAFKA-9127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094791#comment-17094791 ] Matthias J. Sax commented on KAFKA-9127: {quote}Does it qualify as a regression when the workaround is the same as the fix? {quote} IMHO, it does, because if you don't change any code/configs and upgrade to 2.5 it breaks. Btw: setting the number of threads to zero exposes a different bug: the client does not transit to state RUNNING > Needless group coordination overhead for GlobalKTables > -- > > Key: KAFKA-9127 > URL: https://issues.apache.org/jira/browse/KAFKA-9127 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Chris Toomey >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0 > > > When creating a simple stream topology to just populate a GlobalKTable, I > noticed from logging that the stream consumer was doing group coordination > requests (JoinGroup, SyncGroup, ...) to the server, which it had no reason to > do since the global consumer thread populating the table fetches from all > partitions and thus doesn't use the group requests. So this adds needless > overhead on the client, network, and server. > I tracked this down to the stream thread consumer, which is created > regardless of whether it's needed based solely on NUM_STREAM_THREADS_CONFIG > which defaults to 1 I guess. > I found that setting NUM_STREAM_THREADS_CONFIG to 0 will prevent this from > happening, but it'd be a worthwhile improvement to be able to override this > setting in cases of topologies like this that don't have any need for stream > threads. Hence this ticket. > I originally asked about this on the users mailing list where Bruno suggested > I file it as an improvement request. > Here's the Scala code that I'm using that exhibits this: > {code:scala} > val builder: StreamsBuilder = new StreamsBuilder() > val gTable = builder.globalTable[K, V](...) > val stream = new KafkaStreams(builder.build(), props) > stream.start(){code} > Not shown is the state store that I'm populating/using. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9127) Needless group coordination overhead for GlobalKTables
[ https://issues.apache.org/jira/browse/KAFKA-9127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092826#comment-17092826 ] Matthias J. Sax edited comment on KAFKA-9127 at 4/28/20, 6:54 PM: -- [~ableegoldman] Seems we introduced a regression in 2.5.0 via KAFKA-7317 that would be fixed with this ticket (cf. [https://stackoverflow.com/questions/61342530/kafka-streams-2-5-0-requires-input-topic]). If you agree, we should cherry-pick the fix to 2.5 branch. And also add a corresponding test. Thoughts? was (Author: mjsax): [~ableegoldman] Seems we introduced a regression in 2.5.0 via KAFKA-7317 that would be fixed with this ticket (cf. [https://stackoverflow.com/questions/61342530/kafka-streams-2-5-0-requires-input-topic]). If you agree, we should cherry-pick the fix ti 2.5 branch. And also add a corresponding test. Thoughts? > Needless group coordination overhead for GlobalKTables > -- > > Key: KAFKA-9127 > URL: https://issues.apache.org/jira/browse/KAFKA-9127 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Chris Toomey >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0 > > > When creating a simple stream topology to just populate a GlobalKTable, I > noticed from logging that the stream consumer was doing group coordination > requests (JoinGroup, SyncGroup, ...) to the server, which it had no reason to > do since the global consumer thread populating the table fetches from all > partitions and thus doesn't use the group requests. So this adds needless > overhead on the client, network, and server. > I tracked this down to the stream thread consumer, which is created > regardless of whether it's needed based solely on NUM_STREAM_THREADS_CONFIG > which defaults to 1 I guess. > I found that setting NUM_STREAM_THREADS_CONFIG to 0 will prevent this from > happening, but it'd be a worthwhile improvement to be able to override this > setting in cases of topologies like this that don't have any need for stream > threads. Hence this ticket. > I originally asked about this on the users mailing list where Bruno suggested > I file it as an improvement request. > Here's the Scala code that I'm using that exhibits this: > {code:scala} > val builder: StreamsBuilder = new StreamsBuilder() > val gTable = builder.globalTable[K, V](...) > val stream = new KafkaStreams(builder.build(), props) > stream.start(){code} > Not shown is the state store that I'm populating/using. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7317) Use collections subscription for main consumer to reduce metadata
[ https://issues.apache.org/jira/browse/KAFKA-7317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094781#comment-17094781 ] Matthias J. Sax commented on KAFKA-7317: As reported on SO, when setting number of threads to zero, the client state never goes to RUNNING. Sound like another bug? > Use collections subscription for main consumer to reduce metadata > - > > Key: KAFKA-7317 > URL: https://issues.apache.org/jira/browse/KAFKA-7317 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.5.0 > > > In KAFKA-4633 we switched from "collection subscription" to "pattern > subscription" for `Consumer#subscribe()` to avoid triggering auto topic > creating on the broker. In KAFKA-5291, the metadata request was extended to > overwrite the broker config within the request itself. However, this feature > is only used in `KafkaAdminClient`. KAFKA-7320 adds this feature for the > consumer client, too. > This ticket proposes to use the new feature within Kafka Streams to allow the > usage of collection based subscription in consumer and admit clients to > reduce the metadata response size than can be very large for large number of > partitions in the cluster. > Note, that Streams need to be able to distinguish if it connects to older > brokers that do not support the new metadata request and still use pattern > subscription for this case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9916) Materialize Table-Table Join Result to Avoid Performing Same Join Twice
[ https://issues.apache.org/jira/browse/KAFKA-9916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094777#comment-17094777 ] Matthias J. Sax commented on KAFKA-9916: The original example was slightly different: {code:java} KStream stream = ... stream.filter((k,v) -> { v.setA("a"); return true; }); stream.filter((k,v) -> ...);{code} For this case, the filters are not chained but executed in parallel, what basically is a broadcast pattern, ie, each record of `stream` is piped into both filters; conceptually, we would need the duplicate the input record, however as an optimization, we don't copy by only pass the same object twice. > Materialize Table-Table Join Result to Avoid Performing Same Join Twice > --- > > Key: KAFKA-9916 > URL: https://issues.apache.org/jira/browse/KAFKA-9916 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Priority: Major > > If a table-table join processor performs a join and the join needs to forward > downstream the old join result (e.g. due to an aggregation operation > downstream), it performs the same join (i.e. calls the {{ValueJoiner}}) twice. > Given a left value {{L1}}, a right value {{R1}}, and a new right value {{R2}} > with the same keys and input into the join operation in this order, the join > processor at some point will join {{L1}} with {{R1}}. When the new right > value {{R2}} triggers the join, it will join {{L1}} with {{R2}} and again > {{L1}} with {{R1}}. > We could avoid calling the {{ValueJoiner}} twice by materializing the join > result. We would trade a call to the {{ValueJoiner}} with a lookup into a > state store. Depending on the logic in the {{ValueJoiner}} this may or may > not improve the performance. However, calling the {{ValueJoiner}} once will > only access the input values of the {{ValueJoiner}} once, which avoids the > need to copy the input values each time the {{ValueJoiner}} is called. For > example, consider the following {{ValueJoiner}}: > {code:java} > private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) { > leftValue.setSomeValue(rightValue); > return leftValue; > } > {code} > With this {{ValueJoiner}}, {{setSomeValue(rightValue)}} will be called twice > when {{R2}} trigger the join, the first time with {{R2}} and the second time > with {{R1}}. That means, {{R2}} will be overwritten by {{R1}}, which is > probably not what the users want. To get the correct result, the > {{ValueJoiner}} should be implemented as follows: > > {code:java} > private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) { > ComplexValue copy = copy(leftValue); > copy.setSomeValue(rightValue); > return copy; > } > {code} > Copying values during joins could be avoided if the join result were > materialized. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #8574: KAFKA-9925: decorate pseudo-topics with app id
guozhangwang commented on pull request #8574: URL: https://github.com/apache/kafka/pull/8574#issuecomment-620787049 cc @abbccdda @mjsax to take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry
[ https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094772#comment-17094772 ] Guozhang Wang commented on KAFKA-9925: -- [~vvcephei] Thanks for getting a look into this issue. I'm wondering if now is a good time to deprecate the `StreamsBuilder#build()` function to let users use `build(final Properties props)` instead as a tiny KIP. There's risk of course that the props passed in `build` is not the same as the one passed into the `KafkaStreams` constructor. I think we can remember the reference of the Props when building the topology, and then at construction if we found they are not the same (by reference), we can log a warning such that "found the topology is built with some StreamsConfig already, which is not the same as the config passed in the constructor". > Non-key KTable Joining may result in duplicate schema name in confluence > schema registry > > > Key: KAFKA-9925 > URL: https://issues.apache.org/jira/browse/KAFKA-9925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1 >Reporter: Kin Siu >Assignee: John Roesler >Priority: Major > > The second half of issue Andy Bryant reported in KAFKA-9390 looks like still > exist. > When testing non-key join method without passing in "Named", I noticed that > there are schema subjects registered in confluent schema registry without > consumer group Id still, > e.g. > {noformat} > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value" > {noformat} > Code in KTableImpl which constructed above naming : > https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959 > When we have multiple topologies using foreignKey join and registered to same > schema registry, we can have a name clash, and fail to register schema. > In order to clean up these schema subjects, we will need to know the internal > naming of a consumer group's topology, which is not straightforward and error > prone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #8564: KAFKA-9921: disable caching on stores configured to retain duplicates
guozhangwang commented on pull request #8564: URL: https://github.com/apache/kafka/pull/8564#issuecomment-620783397 Also cherry-picked to 2.5. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
ableegoldman commented on a change in pull request #8568: URL: https://github.com/apache/kafka/pull/8568#discussion_r416814163 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java ## @@ -158,8 +158,9 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { produceGlobalTableValues(); -final ReadOnlyKeyValueStore replicatedStore = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); +final ReadOnlyKeyValueStore replicatedStore = IntegrationTestUtils +.getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); +assertNotNull(replicatedStore); Review comment: Why do we have to check for null now? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry
[ https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094729#comment-17094729 ] John Roesler commented on KAFKA-9925: - Ok, I've opened [https://github.com/apache/kafka/pull/8574] . If you have the time, a review would help speed things along. Thanks for the report! > Non-key KTable Joining may result in duplicate schema name in confluence > schema registry > > > Key: KAFKA-9925 > URL: https://issues.apache.org/jira/browse/KAFKA-9925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1 >Reporter: Kin Siu >Assignee: John Roesler >Priority: Major > > The second half of issue Andy Bryant reported in KAFKA-9390 looks like still > exist. > When testing non-key join method without passing in "Named", I noticed that > there are schema subjects registered in confluent schema registry without > consumer group Id still, > e.g. > {noformat} > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value" > {noformat} > Code in KTableImpl which constructed above naming : > https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959 > When we have multiple topologies using foreignKey join and registered to same > schema registry, we can have a name clash, and fail to register schema. > In order to clean up these schema subjects, we will need to know the internal > naming of a consumer group's topology, which is not straightforward and error > prone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
ableegoldman commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r416810713 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java ## @@ -41,132 +54,107 @@ import static org.easymock.EasyMock.replay; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; -import org.easymock.EasyMock; -import org.junit.Test; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; public class HighAvailabilityTaskAssignorTest { -private long acceptableRecoveryLag = 100L; -private int balanceFactor = 1; -private int maxWarmupReplicas = 2; -private int numStandbyReplicas = 0; -private long probingRebalanceInterval = 60 * 1000L; - -private Map clientStates = new HashMap<>(); -private Set allTasks = new HashSet<>(); -private Set statefulTasks = new HashSet<>(); - -private ClientState client1; -private ClientState client2; -private ClientState client3; - -private HighAvailabilityTaskAssignor taskAssignor; - -private void createTaskAssignor() { -final AssignmentConfigs configs = new AssignmentConfigs( -acceptableRecoveryLag, -balanceFactor, -maxWarmupReplicas, -numStandbyReplicas, -probingRebalanceInterval -); -taskAssignor = new HighAvailabilityTaskAssignor( -clientStates, -allTasks, -statefulTasks, -configs); -} +private final AssignmentConfigs configWithoutStandbys = new AssignmentConfigs( +/*acceptableRecoveryLag*/ 100L, +/*balanceFactor*/ 1, +/*maxWarmupReplicas*/ 2, +/*numStandbyReplicas*/ 0, +/*probingRebalanceIntervalMs*/ 60 * 1000L +); + +private final AssignmentConfigs configWithStandbys = new AssignmentConfigs( +/*acceptableRecoveryLag*/ 100L, +/*balanceFactor*/ 1, +/*maxWarmupReplicas*/ 2, +/*numStandbyReplicas*/ 1, +/*probingRebalanceIntervalMs*/ 60 * 1000L +); -@Test -public void shouldDecidePreviousAssignmentIsInvalidIfThereAreUnassignedActiveTasks() { -client1 = EasyMock.createNiceMock(ClientState.class); -expect(client1.prevActiveTasks()).andReturn(singleton(TASK_0_0)); -expect(client1.prevStandbyTasks()).andStubReturn(EMPTY_TASKS); -replay(client1); -allTasks = mkSet(TASK_0_0, TASK_0_1); -clientStates = singletonMap(UUID_1, client1); -createTaskAssignor(); -assertFalse(taskAssignor.previousAssignmentIsValid()); Review comment: Or just remove it completely This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
ableegoldman commented on a change in pull request #8568: URL: https://github.com/apache/kafka/pull/8568#discussion_r416809469 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java ## @@ -163,8 +159,10 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception { // Assert that all messages in the second batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch2NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); Review comment: `OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore` still failed on [one of the builds](https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/OptimizedKTableIntegrationTest/shouldApplyUpdatesToStandbyStore/) at this line :/ But, at least we got farther into the test before it failed so I'd say this is still an improvement This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9923) Join window store duplicates can be compacted in changelog
[ https://issues.apache.org/jira/browse/KAFKA-9923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-9923: --- Fix Version/s: 2.6.0 > Join window store duplicates can be compacted in changelog > --- > > Key: KAFKA-9923 > URL: https://issues.apache.org/jira/browse/KAFKA-9923 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Critical > Fix For: 2.6.0 > > > Stream-stream joins use the regular `WindowStore` implementation but with > `retainDuplicates` set to true. To allow for duplicates while using the same > unique-key underlying stores we just wrap the key with an incrementing > sequence number before inserting it. > This wrapping occurs at the innermost layer of the store hierarchy, which > means the duplicates must first pass through the changelogging layer. At this > point the keys are still identical. So, we end up sending the records to the > changelog without distinct keys and therefore may lose the older of the > duplicates during compaction. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9923) Join window store duplicates can be compacted in changelog
[ https://issues.apache.org/jira/browse/KAFKA-9923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-9923: --- Priority: Blocker (was: Critical) > Join window store duplicates can be compacted in changelog > --- > > Key: KAFKA-9923 > URL: https://issues.apache.org/jira/browse/KAFKA-9923 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > Stream-stream joins use the regular `WindowStore` implementation but with > `retainDuplicates` set to true. To allow for duplicates while using the same > unique-key underlying stores we just wrap the key with an incrementing > sequence number before inserting it. > This wrapping occurs at the innermost layer of the store hierarchy, which > means the duplicates must first pass through the changelogging layer. At this > point the keys are still identical. So, we end up sending the records to the > changelog without distinct keys and therefore may lose the older of the > duplicates during compaction. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #8562: Test compilation fixes for Scala 2.11
hachikuji commented on pull request #8562: URL: https://github.com/apache/kafka/pull/8562#issuecomment-620749977 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8562: Test compilation fixes for Scala 2.11
hachikuji commented on pull request #8562: URL: https://github.com/apache/kafka/pull/8562#issuecomment-620749686 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8562: Test compilation fixes for Scala 2.11
hachikuji commented on pull request #8562: URL: https://github.com/apache/kafka/pull/8562#issuecomment-620749823 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pan3793 commented on pull request #7112: KAFKA-8713: JsonConverter NULL Values are replaced by default values even in NULLABLE fields
pan3793 commented on pull request #7112: URL: https://github.com/apache/kafka/pull/7112#issuecomment-620746182 I do a new implement at https://github.com/apache/kafka/pull/8575 follow the [KIP-581](https://cwiki.apache.org/confluence/display/KAFKA/KIP-581:+Value+of+optional+null+field+which+has+default+value), this PR is deprecated, will close it soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pan3793 opened a new pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null
pan3793 opened a new pull request #8575: URL: https://github.com/apache/kafka/pull/8575 https://issues.apache.org/jira/browse/KAFKA-8713 https://cwiki.apache.org/confluence/display/KAFKA/KIP-581:+Value+of+optional+null+field+which+has+default+value ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org