[jira] [Created] (KAFKA-9934) Information and doc update needed for support of AclAuthorizer when protocol is PLAINTEXT

2020-04-28 Thread kaushik srinivas (Jira)
kaushik srinivas created KAFKA-9934:
---

 Summary: Information and doc update needed for support of 
AclAuthorizer when protocol is PLAINTEXT
 Key: KAFKA-9934
 URL: https://issues.apache.org/jira/browse/KAFKA-9934
 Project: Kafka
  Issue Type: Improvement
  Components: security
Affects Versions: 2.4.1
Reporter: kaushik srinivas


Need information on the case where the protocol is PLAINTEXT for listeners in 
kafka.

Does Authorization applies when the protocol is PLAINTEXT ?

if so, what would be used as the principal name for the authorization acl 
validations?

There is no doc which describes this case.

Need info and doc update for the same.

Thanks,

kaushik.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9921) Caching is not working properly with WindowStateStore when retaining duplicates

2020-04-28 Thread Georgi Petkov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Georgi Petkov updated KAFKA-9921:
-
Summary: Caching is not working properly with WindowStateStore when 
retaining duplicates  (was: Caching is not working properly with 
WindowStateStore when rataining duplicates)

> Caching is not working properly with WindowStateStore when retaining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9933) Need doc update on the AclAuthorizer when SASL_SSL is the protocol used.

2020-04-28 Thread kaushik srinivas (Jira)
kaushik srinivas created KAFKA-9933:
---

 Summary: Need doc update on the AclAuthorizer when SASL_SSL is the 
protocol used.
 Key: KAFKA-9933
 URL: https://issues.apache.org/jira/browse/KAFKA-9933
 Project: Kafka
  Issue Type: Improvement
  Components: security
Affects Versions: 2.4.1
Reporter: kaushik srinivas


Hello,

Document on the usage of the authorizer does not speak about the principal 
being used when the protocol for the listener is chosen as SASL + SSL 
(SASL_SSL).

Suppose kerberos and ssl is enabled together, will the authorization be based 
on the kerberos principal names or on the ssl certificate DN names ?

There is no document covering this part of the use case.

This needs information and documentation update.

Thanks,

Kaushik.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante commented on a change in pull request #8554: KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd

2020-04-28 Thread GitBox


C0urante commented on a change in pull request #8554:
URL: https://github.com/apache/kafka/pull/8554#discussion_r417070806



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -281,9 +281,18 @@ private void readToLogEnd() {
 Iterator> it = 
endOffsets.entrySet().iterator();
 while (it.hasNext()) {
 Map.Entry entry = it.next();
-if (consumer.position(entry.getKey()) >= entry.getValue())
+TopicPartition topicPartition = entry.getKey();
+Long endOffset = entry.getValue();

Review comment:
   Fair enough; added.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #8554: KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd

2020-04-28 Thread GitBox


C0urante commented on pull request #8554:
URL: https://github.com/apache/kafka/pull/8554#issuecomment-620992508


   Thanks @kkonstantine! These all seem like reasonable suggestions and I've 
applied them all. Ready for the next round when you have time.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #8582: KAFKA-9932: Don't load configs from ZK when the log has already been loaded

2020-04-28 Thread GitBox


ijuma opened a new pull request #8582:
URL: https://github.com/apache/kafka/pull/8582


   If a broker contains 8k replicas, we would previously issue 8k ZK calls to 
retrieve topic
   configs when processing the first LeaderAndIsr request. That should 
translate to 0 after
   these changes.
   
   Credit to @junrao for identifying the problem.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9932) First LeaderAndIsrRequest can take long due to unnecessary ZK read

2020-04-28 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-9932:
--

 Summary: First LeaderAndIsrRequest can take long due to 
unnecessary ZK read
 Key: KAFKA-9932
 URL: https://issues.apache.org/jira/browse/KAFKA-9932
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma


[~junrao] found the following issue:
{quote}In Partition, we have the following code. fetchLogConfig is passed in to 
logManager.getOrCreateLog by value. This can increase the processing time for 
the very first LeaderAndIsrRequest since every partition has to do a ZK read to 
load the log config. This is unnecessary if the log is always present and 
loaded during the initialization of LogManager.
{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9932) First LeaderAndIsrRequest can take long due to unnecessary ZK read

2020-04-28 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-9932:
---
Fix Version/s: 2.6.0

> First LeaderAndIsrRequest can take long due to unnecessary ZK read
> --
>
> Key: KAFKA-9932
> URL: https://issues.apache.org/jira/browse/KAFKA-9932
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.6.0
>
>
> [~junrao] found the following issue:
> {quote}In Partition, we have the following code. fetchLogConfig is passed in 
> to logManager.getOrCreateLog by value. This can increase the processing time 
> for the very first LeaderAndIsrRequest since every partition has to do a ZK 
> read to load the log config. This is unnecessary if the log is always present 
> and loaded during the initialization of LogManager.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on a change in pull request #8554: KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd

2020-04-28 Thread GitBox


kkonstantine commented on a change in pull request #8554:
URL: https://github.com/apache/kafka/pull/8554#discussion_r417055199



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -281,9 +281,18 @@ private void readToLogEnd() {
 Iterator> it = 
endOffsets.entrySet().iterator();
 while (it.hasNext()) {
 Map.Entry entry = it.next();
-if (consumer.position(entry.getKey()) >= entry.getValue())
+TopicPartition topicPartition = entry.getKey();
+Long endOffset = entry.getValue();
+long lastConsumedOffset = consumer.position(topicPartition);
+if (lastConsumedOffset >= endOffset) {
+log.trace("Reached end offset {} for {}", endOffset, 
topicPartition);
 it.remove();
-else {
+} else {
+log.trace(
+"Behind end offset {} for {}; last-consumed offset is 
{}",
+endOffset,
+topicPartition,
+lastConsumedOffset);

Review comment:
   ```suggestion
   log.trace("Behind end offset {} for {}; last-consumed 
offset is {}",
   endOffset, topicPartition, lastConsumedOffset);
   ```
   nit: multiline calls don't need to be on their own line in AK and tab is 
equal to 4 spaces (here we need 2 tabs)

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -281,9 +281,18 @@ private void readToLogEnd() {
 Iterator> it = 
endOffsets.entrySet().iterator();
 while (it.hasNext()) {
 Map.Entry entry = it.next();
-if (consumer.position(entry.getKey()) >= entry.getValue())
+TopicPartition topicPartition = entry.getKey();
+Long endOffset = entry.getValue();

Review comment:
   unboxing will happen in the comparison in the `if` branch anyways, so 
probably better to do it early declaring the type `long` here. 
   
   

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -281,9 +281,18 @@ private void readToLogEnd() {
 Iterator> it = 
endOffsets.entrySet().iterator();
 while (it.hasNext()) {
 Map.Entry entry = it.next();
-if (consumer.position(entry.getKey()) >= entry.getValue())
+TopicPartition topicPartition = entry.getKey();
+Long endOffset = entry.getValue();
+long lastConsumedOffset = consumer.position(topicPartition);
+if (lastConsumedOffset >= endOffset) {
+log.trace("Reached end offset {} for {}", endOffset, 
topicPartition);

Review comment:
   given that the previous messages say "Reading to ..." maybe it would 
make sense to say:
   ```suggestion
   log.trace("Read to end offset {} for {}", endOffset, 
topicPartition);
   ```

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -281,9 +281,18 @@ private void readToLogEnd() {
 Iterator> it = 
endOffsets.entrySet().iterator();
 while (it.hasNext()) {
 Map.Entry entry = it.next();
-if (consumer.position(entry.getKey()) >= entry.getValue())
+TopicPartition topicPartition = entry.getKey();
+Long endOffset = entry.getValue();
+long lastConsumedOffset = consumer.position(topicPartition);
+if (lastConsumedOffset >= endOffset) {
+log.trace("Reached end offset {} for {}", endOffset, 
topicPartition);
 it.remove();
-else {
+} else {
+log.trace(
+"Behind end offset {} for {}; last-consumed offset is 
{}",
+endOffset,
+topicPartition,
+lastConsumedOffset);

Review comment:
   Similar to the above, seeing a message that says `read` might be easier 
to read in context than `consumed`. 
   How about: 
   `Behind end offset {} for {}; last-read offset is {}`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-28 Thread GitBox


vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620976620


   Cherry-picked to 2.5 as 9e2785fd1ba0ed16604e01058bae6b60ff9f3d96



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #8581: MINOR: Fix typo and rephrase content in docs

2020-04-28 Thread GitBox


showuon opened a new pull request #8581:
URL: https://github.com/apache/kafka/pull/8581


   1. fix typo: `atleast` -> `at least`
   2. add missing `--` to be consistent
   3. rephrase a sentence, to make it more clear:
   
   before: `LinkedIn is currently running JDK 1.8 u5 (looking to upgrade to a 
newer version) with the G1 collector`
   
   It will misguide the users to use JDK 1.8 u5, while the JDK 1.8 u251 is 
already released, which will include many important bug fixes. I did some 
rephrase as below:
   
   after: `At the time when we write this, LinkedIn is running JDK 1.8 u5 
(looking to upgrade to a newer version) with the G1 collector`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-28 Thread GitBox


vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620973163


   Cherry-pick for 2.5 in progress...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8204: KAFKA-9633: Ensure ConfigProviders are closed

2020-04-28 Thread GitBox


kkonstantine commented on pull request #8204:
URL: https://github.com/apache/kafka/pull/8204#issuecomment-620971604


   JDK8 build failed on a relevant test: 
   
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2007/testReport/junit/org.apache.kafka.connect.runtime/WorkerTest/testStartAndStopConnector/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8574: KAFKA-9925: decorate pseudo-topics with app id

2020-04-28 Thread GitBox


vvcephei commented on pull request #8574:
URL: https://github.com/apache/kafka/pull/8574#issuecomment-620971180


   Thanks, @arkins ! Shame is a powerful motivator :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


vvcephei commented on pull request #8578:
URL: https://github.com/apache/kafka/pull/8578#issuecomment-620970911


   Thanks for the review, @guozhangwang . I've addressed (or responded to) your 
comments.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8578:
URL: https://github.com/apache/kafka/pull/8578#discussion_r417047054



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
##
@@ -106,7 +108,7 @@ public void before() {
 
 consumerConfiguration = new Properties();
 
consumerConfiguration.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
name.getMethodName() + "-consumer");
+consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
safeTestName + "-consumer");

Review comment:
   ```suggestion
   consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"group-" + safeTestName);
   ```
   
   No, I just got tired of messing with every tests' idiosyncrasies.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8578:
URL: https://github.com/apache/kafka/pull/8578#discussion_r417046624



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
##
@@ -156,7 +157,7 @@ public void 
shouldPreservePartitionTimeOnKafkaStreamRestart() {
 assertThat(lastRecordedTimestamp, is(5000L));
 } finally {
 kafkaStreams.close();
-cleanStateAfterTest(CLUSTER, kafkaStreams);
+quietlyCleanStateAfterTest(CLUSTER, kafkaStreams);

Review comment:
   Unfortunately, we generally can't use try-with-resources for these 
tests, since that makes the `kafkaStreams` reference out of scope for the 
finally block. We'd have to allow a reference to kafkaStreams to escape the try 
{} block to reference it either in finally or in an After method, which is just 
as messy as it currently is, if not more. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8579: KAFKA-9930: Prevent ReplicaFetcherThread from throwing UnknownTopicOrPartitionException upon topic creation and deletion.

2020-04-28 Thread GitBox


ijuma commented on a change in pull request #8579:
URL: https://github.com/apache/kafka/pull/8579#discussion_r417044882



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -382,6 +382,11 @@ abstract class AbstractFetcherThread(name: String,
 "that the partition is being moved")
   partitionsWithError += topicPartition
 
+case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
+  warn(s"Remote broker does not host the partition 
$topicPartition, which could indicate " +
+"that the partition is being created or deleted.")

Review comment:
   Should it be `info` if we think this is expected? That would still show 
in the logs.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8578:
URL: https://github.com/apache/kafka/pull/8578#discussion_r417044713



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -145,12 +162,12 @@ public static void cleanStateBeforeTest(final 
EmbeddedKafkaCluster cluster, fina
 }
 }
 
-public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, 
final KafkaStreams driver) {
-driver.cleanUp();
+public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster 
cluster, final KafkaStreams driver) {
 try {
+driver.cleanUp();
 cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
-} catch (final InterruptedException e) {
-throw new RuntimeException(e);
+} catch (final RuntimeException | InterruptedException e) {
+LOG.warn("Ignoring failure to clean test state", e);
 }

Review comment:
   I share your concern, but I'm not sure about the conclusion.
   
   Yes, if there is state (such as a topic) that leaks from one test to the 
next, it can certainly cause difficult-to-debug failures. However, there are 
multiple things we can do to prevent/mitigate it:
   * delete state after tests (not to leave any garbage behind)
   * delete state before the tests (to ensure a clean slate for the test)
   * choose unique names for all resources of each test (this is where the 
other part of this PR comes in)
   
   Any one of these should be sufficient to prevent state from leaking in 
between tests, and most of these tests do all three. In other words, we have 3x 
redundancy guarding against such test pollution. If you look at all three of 
these measures, the clean up _after_ tests is actually the most optional, since 
tests can't tolerate failures in the clean up _before_ (because it also creates 
necessary topics), and choosing unique topic names per test is bulletproof and 
easy to fix (once we know what the problem is).
   
   Whether the cleanup is part of the test or in the `@After` method, the 
outcome is the same, if the method throws an exception, the test will fail. The 
downside of After is that it requires you to store the topic names in mutable 
class-level fields, which actually makes it more awkward to choose unique names 
per test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8442: KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses

2020-04-28 Thread GitBox


kkonstantine commented on pull request #8442:
URL: https://github.com/apache/kafka/pull/8442#issuecomment-620964908


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8442: KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses

2020-04-28 Thread GitBox


kkonstantine commented on pull request #8442:
URL: https://github.com/apache/kafka/pull/8442#issuecomment-620964776


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8578:
URL: https://github.com/apache/kafka/pull/8578#discussion_r417040202



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
##
@@ -215,13 +216,13 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
 }
 
 private Properties streamsConfiguration() {
-final String applicationId = "streamsApp";
+final String safeTestName = safeUniqueTestName(getClass(), testName);
 final Properties config = new Properties();
 config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.OPTIMIZE);
-config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId + 
name.getMethodName());
+config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
 config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + 
(++port));
 config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
-config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(applicationId).getPath());
+config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());

Review comment:
   I wavered on this point, but each time you call `tempDirectory`, it 
should give you a completely new directory:
   ```
* Create a temporary relative directory in the default temporary-file 
directory with the given prefix.
*
* @param prefix The prefix of the temporary directory, if null using 
"kafka-" as default prefix
   ```
   
   So the prefix seems to be nice only for documenting which test a directory 
is for, not for enforcing any kind of test/directory uniqueness. I felt like it 
added more noise than value, so I just dropped all the prefixes. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8580: KAFKA-9832: fix attempt to commit non-running tasks

2020-04-28 Thread GitBox


vvcephei commented on pull request #8580:
URL: https://github.com/apache/kafka/pull/8580#issuecomment-620962357


   Hey @mjsax , do you have time for a quick review?
   
   This bug seems to have been introduced by 
https://github.com/apache/kafka/pull/8440/files#r407722022 , which attempts to 
commit all non-corrupted tasks. Some of these tasks may not be running. The 
Task implementations will throw an exception if we attempt to `prepareCommit` 
while not in state RUNNING (or RESTORING).
   
   We could make the task more permissive, so that it would ignore the commit 
to a task that is not in a committable state. I opted instead to filter out 
only the tasks in committable states, though. I was concerned that if we make 
prepareCommit more permissive, we might just complicate the rest of the commit 
lifecycle, because then the rest of it would also have to be permissive, etc.
   
   Thanks for the very nice test in your prior PR; it was easy to extend it to 
cover this case and also to add the regression test.
   
   WDYT?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #8580: KAFKA-9832: fix attempt to commit non-running tasks

2020-04-28 Thread GitBox


vvcephei opened a new pull request #8580:
URL: https://github.com/apache/kafka/pull/8580


   Fixes an attempt to commit potentially non-running tasks while recovering 
from task corruption.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9931) Kafka Connect should accept '-1' as a valid replication factor

2020-04-28 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094975#comment-17094975
 ] 

Randall Hauch edited comment on KAFKA-9931 at 4/29/20, 1:52 AM:


We'll need a KIP for this change, but it should be relatively straightforward 
if we don't change the default or the config files, and merely accept values 
that are either 1 or >= 1.

However, I do think this is a good time for Connect's worker logic when 
creating internal topics to support other topic settings via worker config 
properties using the {{config.storage.topic.*}}*,* {{offset.storage.topic.*}}, 
and {{status.storage.topic.*}} prefixes.


was (Author: rhauch):
We'll need a KIP for this change, but it should be relatively straightforward 
if we don't change the default or the config files, and merely accept values 
that are either 1 or >= 1.

However, I do think this is a good time for Connect's worker logic when 
creating internal topics to support other topic settings via worker config 
properties using the `config.storage.topic.*`, `offset.storage.topic.*`, and 
`status.storage.topic.*` prefixes.

> Kafka Connect should accept '-1' as a valid replication factor
> --
>
> Key: KAFKA-9931
> URL: https://issues.apache.org/jira/browse/KAFKA-9931
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
>  Labels: needs-kip
> Fix For: 2.6.0
>
>
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic]
> As of KIP-464, the adminclient can use '-1' as the replication factor or 
> partitions and the broker defaults. The Kafka Connect Frame work does not 
> currently accept anything less than 1 as a valid replication factor. This 
> should be changed so that Connect worker configurations can specify `-1` for 
> the internal topic replication factors to default to use the broker's default 
> replication factor for new topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9931) Kafka Connect should accept '-1' as a valid replication factor

2020-04-28 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094975#comment-17094975
 ] 

Randall Hauch edited comment on KAFKA-9931 at 4/29/20, 1:51 AM:


We'll need a KIP for this change, but it should be relatively straightforward 
if we don't change the default or the config files, and merely accept values 
that are either 1 or >= 1.

However, I do think this is a good time for Connect's worker logic when 
creating internal topics to support other topic settings via worker config 
properties using the `config.storage.topic.*`, `offset.storage.topic.*`, and 
`status.storage.topic.*` prefixes.


was (Author: rhauch):
We'll need a KIP for this change, but it should be relatively straightforward 
if we don't change the default or the config files, and merely accept values 
that are either -1 or >= 1.

> Kafka Connect should accept '-1' as a valid replication factor
> --
>
> Key: KAFKA-9931
> URL: https://issues.apache.org/jira/browse/KAFKA-9931
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
>  Labels: needs-kip
> Fix For: 2.6.0
>
>
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic]
> As of KIP-464, the adminclient can use '-1' as the replication factor or 
> partitions and the broker defaults. The Kafka Connect Frame work does not 
> currently accept anything less than 1 as a valid replication factor. This 
> should be changed so that Connect worker configurations can specify `-1` for 
> the internal topic replication factors to default to use the broker's default 
> replication factor for new topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9931) Kafka Connect should accept '-1' as a valid replication factor

2020-04-28 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094975#comment-17094975
 ] 

Randall Hauch commented on KAFKA-9931:
--

We'll need a KIP for this change, but it should be relatively straightforward 
if we don't change the default or the config files, and merely accept values 
that are either -1 or >= 1.

> Kafka Connect should accept '-1' as a valid replication factor
> --
>
> Key: KAFKA-9931
> URL: https://issues.apache.org/jira/browse/KAFKA-9931
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
>  Labels: needs-kip
> Fix For: 2.6.0
>
>
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic]
> As of KIP-464, the adminclient can use '-1' as the replication factor or 
> partitions and the broker defaults. The Kafka Connect Frame work does not 
> currently accept anything less than 1 as a valid replication factor. This 
> should be changed so that Connect worker configurations can specify `-1` for 
> the internal topic replication factors to default to use the broker's default 
> replication factor for new topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9931) Kafka Connect should accept '-1' as a valid replication factor

2020-04-28 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9931:
-
Labels: needs-kip  (was: )

> Kafka Connect should accept '-1' as a valid replication factor
> --
>
> Key: KAFKA-9931
> URL: https://issues.apache.org/jira/browse/KAFKA-9931
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
>  Labels: needs-kip
> Fix For: 2.6.0
>
>
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic]
> As of KIP-464, the adminclient can use '-1' as the replication factor or 
> partitions and the broker defaults. The Kafka Connect Frame work does not 
> currently accept anything less than 1 as a valid replication factor. This 
> should be changed so that Connect worker configurations can specify `-1` for 
> the internal topic replication factors to default to use the broker's default 
> replication factor for new topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9931) Kafka Connect should accept '-1' as a valid replication factor

2020-04-28 Thread Randall Hauch (Jira)
Randall Hauch created KAFKA-9931:


 Summary: Kafka Connect should accept '-1' as a valid replication 
factor
 Key: KAFKA-9931
 URL: https://issues.apache.org/jira/browse/KAFKA-9931
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 2.5.0
Reporter: Randall Hauch
Assignee: Randall Hauch
 Fix For: 2.6.0


[https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic]

As of KIP-464, the adminclient can use '-1' as the replication factor or 
partitions and the broker defaults. The Kafka Connect Frame work does not 
currently accept anything less than 1 as a valid replication factor. This 
should be changed so that Connect worker configurations can specify `-1` for 
the internal topic replication factors to default to use the broker's default 
replication factor for new topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] efeg opened a new pull request #8579: KAFKA-9930: Prevent ReplicaFetcherThread from throwing UnknownTopicOrPartitionException upon topic creation and deletion.

2020-04-28 Thread GitBox


efeg opened a new pull request #8579:
URL: https://github.com/apache/kafka/pull/8579


   When does UnknownTopicOrPartitionException typically occur?
* Upon a topic creation, a follower broker of a new partition starts 
replica fetcher before the prospective leader broker of the new partition 
receives the leadership information from the controller (see 
[KAFKA-6221](https://issues.apache.org/jira/browse/KAFKA-6221)).
* Upon a topic deletion, a follower broker of a to-be-deleted partition 
starts replica fetcher after the leader broker of the to-be-deleted partition 
processes the deletion information from the controller.
* As expected, clusters with frequent topic creation and deletion report 
UnknownTopicOrPartitionException with relatively higher frequency.
   
   What is the impact?
* Exception tracking systems identify the error logs with 
UnknownTopicOrPartitionException as an exception. This results in a lot of 
noise for a transient issue that is expected to recover by itself and a natural 
process in Kafka due to its asynchronous state propagation.
   
   Why not move it to a lower than warn-level log?
* Despite typically being a transient issue, 
UnknownTopicOrPartitionException may also indicate real issues if it doesn't 
fix itself after a short period of time. To ensure detection of such scenarios, 
we set the log level to warn.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9930) Prevent ReplicaFetcherThread from throwing UnknownTopicOrPartitionException upon topic creation and deletion.

2020-04-28 Thread Adem Efe Gencer (Jira)
Adem Efe Gencer created KAFKA-9930:
--

 Summary: Prevent ReplicaFetcherThread from throwing 
UnknownTopicOrPartitionException upon topic creation and deletion.
 Key: KAFKA-9930
 URL: https://issues.apache.org/jira/browse/KAFKA-9930
 Project: Kafka
  Issue Type: Bug
  Components: logging
Affects Versions: 2.5.0, 2.4.0, 2.3.0, 2.2.0, 2.1.0, 2.0.0, 1.1.0, 1.0.0, 
0.11.0.0, 0.10.0.0
Reporter: Adem Efe Gencer
Assignee: Adem Efe Gencer


When does UnknownTopicOrPartitionException typically occur?
 * Upon a topic creation, a follower broker of a new partition starts replica 
fetcher before the prospective leader broker of the new partition receives the 
leadership information from the controller. Apache Kafka has a an open issue 
about this (see KAFKA-6221)
 * Upon a topic deletion, a follower broker of a to-be-deleted partition starts 
replica fetcher after the leader broker of the to-be-deleted partition 
processes the deletion information from the controller.
 * As expected, clusters with frequent topic creation and deletion report 
UnknownTopicOrPartitionException with relatively higher frequency.

What is the impact?
 * Exception tracking systems identify the error logs with 
UnknownTopicOrPartitionException as an exception. This results in a lot of 
noise for a transient issue that is expected to recover by itself and a natural 
process in Kafka due to its asynchronous state propagation.

Why not move it to a lower than warn-level log?
 * Despite typically being a transient issue, UnknownTopicOrPartitionException 
may also indicate real issues if it doesn't fix itself after a short period of 
time. To ensure detection of such scenarios, we set the log level to warn.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


guozhangwang commented on a change in pull request #8578:
URL: https://github.com/apache/kafka/pull/8578#discussion_r417015562



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
##
@@ -215,13 +216,13 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
 }
 
 private Properties streamsConfiguration() {
-final String applicationId = "streamsApp";
+final String safeTestName = safeUniqueTestName(getClass(), testName);
 final Properties config = new Properties();
 config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.OPTIMIZE);
-config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId + 
name.getMethodName());
+config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
 config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + 
(++port));
 config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
-config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(applicationId).getPath());
+config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());

Review comment:
   Is it safer to encode the appID as part of the dir path to avoid 
collision?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -145,12 +162,12 @@ public static void cleanStateBeforeTest(final 
EmbeddedKafkaCluster cluster, fina
 }
 }
 
-public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, 
final KafkaStreams driver) {
-driver.cleanUp();
+public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster 
cluster, final KafkaStreams driver) {
 try {
+driver.cleanUp();
 cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
-} catch (final InterruptedException e) {
-throw new RuntimeException(e);
+} catch (final RuntimeException | InterruptedException e) {
+LOG.warn("Ignoring failure to clean test state", e);
 }

Review comment:
   req: Actually deleting topics after test is critical for some tests: 
I've encountered some cases where the same topics are reused mistakenly across 
different test cases within the single class. But I feel that it is better to 
put the topic deletion in the `@after` function while leaving `cleanUp()` as 
part of the test function itself.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
##
@@ -156,7 +157,7 @@ public void 
shouldPreservePartitionTimeOnKafkaStreamRestart() {
 assertThat(lastRecordedTimestamp, is(5000L));
 } finally {
 kafkaStreams.close();
-cleanStateAfterTest(CLUSTER, kafkaStreams);
+quietlyCleanStateAfterTest(CLUSTER, kafkaStreams);

Review comment:
   nit: we can put kafkaStreams in a try block.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
##
@@ -106,7 +108,7 @@ public void before() {
 
 consumerConfiguration = new Properties();
 
consumerConfiguration.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
name.getMethodName() + "-consumer");
+consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
safeTestName + "-consumer");

Review comment:
   Somewhere else it is set as `"group-" + safeTestName`, is this change 
intentional?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
##
@@ -243,7 +251,7 @@ public void shouldRecoverBufferAfterShutdown() {
 
 } finally {
 driver.close();
-cleanStateAfterTest(CLUSTER, driver);
+quietlyCleanStateAfterTest(CLUSTER, driver);

Review comment:
   nit: ditto here, we can put `driver` in the try block. And ditto 
elsewhere.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gwenshap commented on pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test

2020-04-28 Thread GitBox


gwenshap commented on pull request #8518:
URL: https://github.com/apache/kafka/pull/8518#issuecomment-620932278


   No failures, nice :)
   Great update @lbradstreet and thanks for contributing.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bseenu commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0

2020-04-28 Thread GitBox


bseenu commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r417002126



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##
@@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) {
 Checkpoint.unwrapGroup(record.sourcePartition()),
 System.currentTimeMillis() - record.timestamp());
 }
+
+private void refreshIdleConsumerGroupOffset() {
+Map> consumerGroupsDesc 
= targetAdminClient
+.describeConsumerGroups(consumerGroups).describedGroups();
+
+for (String group : consumerGroups) {
+try {
+if (consumerGroupsDesc.get(group) == null) {
+// if consumerGroupsDesc does not contain this group, it 
should be the new consumer
+// group created at source cluster and its offsets should 
be sync-ed to target
+newConsumerGroup.add(group);
+continue;
+}
+ConsumerGroupDescription consumerGroupDesc = 
consumerGroupsDesc.get(group).get();
+// sync offset to the target cluster only if the state of 
current consumer group is idle or dead
+ConsumerGroupState consumerGroupState = 
consumerGroupDesc.state();
+if (consumerGroupState.equals(ConsumerGroupState.EMPTY) || 
consumerGroupState.equals(ConsumerGroupState.DEAD)) {
+idleConsumerGroupsOffset.put(group, 
targetAdminClient.listConsumerGroupOffsets(group)
+.partitionsToOffsetAndMetadata().get().entrySet());
+}
+} catch (InterruptedException | ExecutionException e) {
+log.error("Error querying for consumer group {} on cluster 
{}.", group, targetClusterAlias, e);
+}
+}
+}
+
+Map> syncGroupOffset() {
+Map> offsetToSyncAll = 
new HashMap<>();
+
+// first, sync offsets for the idle consumers at target
+for (Map.Entry>> group : idleConsumerGroupsOffset.entrySet()) {
+String consumerGroupId = group.getKey();
+// for each idle consumer at target, read the checkpoints 
(converted upstream offset)
+// from the pre-populated map
+Map convertedUpstreamOffset = 
getConvertedUpstreamOffset(consumerGroupId);
+
+if (convertedUpstreamOffset == null) continue;
+
+Map offsetToSync = new 
HashMap<>();
+for (Entry entry : 
group.getValue()) {

Review comment:
   Yes





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0

2020-04-28 Thread GitBox


ning2008wisc commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r416995009



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##
@@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) {
 Checkpoint.unwrapGroup(record.sourcePartition()),
 System.currentTimeMillis() - record.timestamp());
 }
+
+private void refreshIdleConsumerGroupOffset() {
+Map> consumerGroupsDesc 
= targetAdminClient
+.describeConsumerGroups(consumerGroups).describedGroups();
+
+for (String group : consumerGroups) {
+try {
+if (consumerGroupsDesc.get(group) == null) {
+// if consumerGroupsDesc does not contain this group, it 
should be the new consumer
+// group created at source cluster and its offsets should 
be sync-ed to target
+newConsumerGroup.add(group);
+continue;
+}
+ConsumerGroupDescription consumerGroupDesc = 
consumerGroupsDesc.get(group).get();
+// sync offset to the target cluster only if the state of 
current consumer group is idle or dead
+ConsumerGroupState consumerGroupState = 
consumerGroupDesc.state();
+if (consumerGroupState.equals(ConsumerGroupState.EMPTY) || 
consumerGroupState.equals(ConsumerGroupState.DEAD)) {
+idleConsumerGroupsOffset.put(group, 
targetAdminClient.listConsumerGroupOffsets(group)
+.partitionsToOffsetAndMetadata().get().entrySet());
+}
+} catch (InterruptedException | ExecutionException e) {
+log.error("Error querying for consumer group {} on cluster 
{}.", group, targetClusterAlias, e);
+}
+}
+}
+
+Map> syncGroupOffset() {
+Map> offsetToSyncAll = 
new HashMap<>();
+
+// first, sync offsets for the idle consumers at target
+for (Map.Entry>> group : idleConsumerGroupsOffset.entrySet()) {
+String consumerGroupId = group.getKey();
+// for each idle consumer at target, read the checkpoints 
(converted upstream offset)
+// from the pre-populated map
+Map convertedUpstreamOffset = 
getConvertedUpstreamOffset(consumerGroupId);
+
+if (convertedUpstreamOffset == null) continue;
+
+Map offsetToSync = new 
HashMap<>();
+for (Entry entry : 
group.getValue()) {

Review comment:
   If I am understanding right, are you asking about this scenario: 
consumer A is consuming from Topic `x` and ` y` and MM is replicating the 
offset of consumer A for Topic `x` and `y`. What if consumer A starts consume 
from Topic `x`, `y` and `z` where `z` is a new topic, why MM does not replicate 
the offset of consumer A for Topic `z`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zhaohaidao commented on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

2020-04-28 Thread GitBox


zhaohaidao commented on pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#issuecomment-620915590


   @abbccdda Hi, pr updated. Could you continue to review it?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-28 Thread GitBox


ableegoldman commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620909428


   One unrelated failure: 
`MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9929) Support reverse iterator on WindowStore

2020-04-28 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-9929:
---

 Summary: Support reverse iterator on WindowStore
 Key: KAFKA-9929
 URL: https://issues.apache.org/jira/browse/KAFKA-9929
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Jorge Esteban Quilcate Otoya


Currently, WindowStore fetch operations return an iterator sorted from earliest 
to latest result:

```

* For each key, the iterator guarantees ordering of windows, starting from the 
oldest/earliest

* available window to the newest/latest window.

```

 

We have a use-case where traces are stored in a WindowStore and 
use Kafka Streams to create a materialized view of traces. A query request 
comes with a time range (e.g. now-1h, now) and want to return the most recent 
results, i.e. fetch from this period of time, iterate and pattern match 
latest/most recent traces, and if enough results, then reply without moving 
further on the iterator.

Same store is used to search for previous traces. In this case, it search a key 
for the last day, if found traces, we would also like to iterate from the most 
recent.

RocksDb seems to support iterating backward and forward: 
[https://github.com/facebook/rocksdb/wiki/Iterator#iterating-upper-bound-and-lower-bound]

 

For reference: This in some way extracts some bits from this previous issue: 
https://issues.apache.org/jira/browse/KAFKA-4212:

 

> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp. But 
> this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.

 

Would like to know if there is any impediment on RocksDb or  WindowStore to 
support this.

Adding an argument to reverse in current fetch methods would be great:

```

WindowStore.fetch(from,to,Direction.BACKWARD|FORWARD)

```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8578:
URL: https://github.com/apache/kafka/pull/8578#discussion_r416969713



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
##
@@ -101,8 +102,8 @@ public void before() throws Exception {
 builder = new StreamsBuilder();
 createTopics();
 streamsConfiguration = new Properties();
-final String applicationId = "global-thread-shutdown-test" + 
testName.getMethodName();
-streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
+final String safeTestName = safeUniqueTestName(getClass(), testName);
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);

Review comment:
   I've standardized all the usages to be just "app", followed by the 
generated name, since the generated name contains the same information that we 
previously hand-wrote into the prefix or suffix. All we really need to do is 
ensure that the app id won't collide with a group name that we might use in a 
verification consumer, for example. For that reason, I've never used the 
generated name "plain", but always scoped it to the usage (app id, group id, 
input topic, etc.).
   
   It's not super important to apply these ideas universally, but I felt it 
would make it easier to write more tests like it in the future if I just made a 
full pass on all the tests to make them all look the same.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
##
@@ -88,16 +89,17 @@
 private String stateStoreName;
 
 @Rule
-public TestName name = new TestName();
+public TestName testName = new TestName();
 
 @Before
 public void before() {
-inputTopicName = "input-topic-" + name.getMethodName();
-outputTopicName = "output-topic-" + name.getMethodName();
-stateStoreName = "lagfetch-test-store" + name.getMethodName();
+final String safeTestName = safeUniqueTestName(getClass(), testName);
+inputTopicName = "input-topic-" + safeTestName;
+outputTopicName = "output-topic-" + safeTestName;
+stateStoreName = "lagfetch-test-store" + safeTestName;
 
 streamsConfiguration = new Properties();
-streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"lag-fetch-" + name.getMethodName());
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"lag-fetch-" + safeTestName);

Review comment:
   ```suggestion
   streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" 
+ safeTestName);
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -145,12 +162,12 @@ public static void cleanStateBeforeTest(final 
EmbeddedKafkaCluster cluster, fina
 }
 }
 
-public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, 
final KafkaStreams driver) {
-driver.cleanUp();
+public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster 
cluster, final KafkaStreams driver) {
 try {
+driver.cleanUp();
 cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
-} catch (final InterruptedException e) {
-throw new RuntimeException(e);
+} catch (final RuntimeException | InterruptedException e) {
+LOG.warn("Ignoring failure to clean test state", e);
 }

Review comment:
   This is really the fix for KAFKA-9875. The other change just hopefully 
reduces the probability that ignoring the exceptions could cause subsequent 
failures (e.g., if the topics don't get deleted before the next test, at least 
the next one will have different topic names).
   
   I've verified that all usages of this method are ok to ignore potential 
exceptions. Namely, as long as the test logic itself doesn't want to ensure 
that any topics got deleted, and as long as this method is the last line in the 
method, then it should be fine just to ignore failures here.
   
   I also considered just deleting the method, but if it does succeed, then it 
leaves less garbage around for subsequent tests, so it feels better to at least 
attempt a cleanup.

##
File path: streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
##
@@ -106,7 +106,9 @@ public static void 
startKafkaStreamsAndWaitForRunningState(final KafkaStreams ka
 kafkaStreams.start();
 assertThat(
 "KafkaStreams did not transit to RUNNING state within " + 
timeoutMs + " milli seconds.",
-countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS), 
equalTo(true));
+countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS),
+equalTo(true)
+);

Review comment:
   just fixing the formatting.





[GitHub] [kafka] vvcephei opened a new pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


vvcephei opened a new pull request #8578:
URL: https://github.com/apache/kafka/pull/8578


   The ticket is for a flaky test that failed to clean up topics _after_ the 
test, which
   isn't strictly necessary for test success.
   
   * alter the "clean up after test" method to never throw an exception
 (after verifying it's always the last invocation inside a finally block,
 so it won't break any test semantics)
   * consolidated the naming of all integration tests' app ids, topics, etc., 
by introducing 
 a new test utility to generate safe, unique, descriptive names.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] steverod commented on pull request #8542: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-28 Thread GitBox


steverod commented on pull request #8542:
URL: https://github.com/apache/kafka/pull/8542#issuecomment-620893016


   retest this please
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #8569: KIP-551: Expose disk read and write metrics

2020-04-28 Thread GitBox


cmccabe commented on a change in pull request #8569:
URL: https://github.com/apache/kafka/pull/8569#discussion_r416961523



##
File path: core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
##
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.nio.file.{Files, Paths}
+
+import org.apache.kafka.common.utils.Time
+import org.slf4j.Logger
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Retrieves Linux /proc/self/io metrics.
+ */
+class LinuxIoMetricsCollector(val procPath: String, val time: Time, val 
logger: Logger) {
+  import LinuxIoMetricsCollector._
+  var lastUpdateMs = -1L
+  var cachedReadBytes = 0L
+  var cachedWriteBytes = 0L
+
+  def readBytes(): Long = this.synchronized {
+val curMs = time.milliseconds()
+if (curMs != lastUpdateMs) {
+  updateValues(curMs)
+}
+cachedReadBytes
+  }
+
+  def writeBytes(): Long = this.synchronized {
+val curMs = time.milliseconds()
+if (curMs != lastUpdateMs) {
+  updateValues(curMs)
+}
+cachedWriteBytes
+  }
+
+  /**
+   * Read /proc/self/io.
+   *
+   * Generally, each line in this file contains a prefix followed by a colon 
and a number.
+   *
+   * For example, it might contain this:
+   * rchar: 4052
+   * wchar: 0
+   * syscr: 13
+   * syscw: 0
+   * read_bytes: 0
+   * write_bytes: 0
+   * cancelled_write_bytes: 0
+   */
+  def updateValues(now: Long): Boolean = this.synchronized {

Review comment:
   Unless we choose to read this file in a background thread, there isn't a 
reason to avoid using a lock here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8567: KAFKA-9652: Fix throttle metric in RequestChannel and request log due to KIP-219

2020-04-28 Thread GitBox


ijuma commented on pull request #8567:
URL: https://github.com/apache/kafka/pull/8567#issuecomment-620887867


   2 jobs passed, 1 unrelated flaky test failed:
   
   > 
org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-28 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094900#comment-17094900
 ] 

Georgi Petkov edited comment on KAFKA-9921 at 4/28/20, 10:23 PM:
-

[~ableegoldman]

Yeah, I agree that probably not much can be done in terms of caching (compared 
to the options without _retainDuplicates_).

I totally agree that many of the features like the null value behavior are 
correct and make perfect sense from point of view of the features implemented 
with it. Still, it's strange from the perspective where you use it standalone. 
*1-2 sentences clarifying the behavior with null values in the 
_WindowStateStore_ documentation could definitely help.* In addition, as I said 
if this is the desired behavior *you can easily skip calling RocksDB for null 
values (when using _retainDuplicates)_. This would both make the intention 
clearer and obviously avoid unnecessary calls.*

I do need exactly stream-stream join but without the repartition part. I want 
to get matches when there are new events in whichever stream, support duplicate 
keys in the stream and I also use _WindowStateStore_ only for the retention 
policy. In fact, due to the lack of many examples, I was looking at the 
stream-stream join implementation to find out how to correctly use the 
_WindowStateStores_. I'm building a library for some common yet not trivial at 
all operations on streams that you may need like topological sorting. Therefore 
I don't know if the user will provide null values or not. I was curious about 
the behavior with null values so I know what I'm providing to the user. I've 
tested it and that's how I found out what is the exact behavior.

*I'm not sure that an in-memory or any custom state store will make it.* Yes, 
in-memory will help with the efficient append because it avoids any expensive 
call and serializations/deserializations. Nevertheless, *you will always have 
the serializations/deserializations somewhere and this is the changelog topic 
and there you have also bandwidth* (not just precious processing time). Even if 
the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 
4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - 
O(n^2). Combined with the fact that I want to provide a library to many 
different users (and duplicates count may vary a lot between usages) *to me 
it's best to implement just as in the stream-stream join - with duplicates*. 
Still, it was a great discussion and made me more confident in my decisions. 
Thank you for your assistance.

*Regarding the PR - it adds the same code to both _WindowStoreBuilder.java_ and 
_TimestampedWindowStoreBuilder.java_ but adds a test for only one of them.*


was (Author: georgi.petkov):
[~ableegoldman]

Yeah, I agree that probably not much can be done in terms of caching (compared 
to the options without _retainDuplicates_).

I totally agree that many of the features like the null value behavior are 
correct and make perfect sense from point of view of the features implemented 
with it. Still, it's strange from the perspective where you use it standalone. 
*1-2 sentences clarifying the behavior with null values in the 
_WindowStateStore_ documentation could definitely help.* In addition, as I said 
if this is the desired behavior *you can easily skip calling RocksDB for null 
values (when using _retainDuplicates)_. This would make both the intention 
clearer and obviously avoid unnecessary calls.*

I do need exactly stream-stream join but without the repartition part. I want 
to get matches when there are new events in whichever stream and I also use 
_WindowStateStore_ only for the retention policy. In fact, due to the lack of 
many examples, I was looking at the stream-stream join implementation to find 
out how to correctly use the _WindowStateStores_. I'm building a library for 
some common yet not trivial at all operations on streams that you may need like 
topological sorting. Therefore I don't know if the user will provide null 
values or not. I was curious about the behavior with null values so I know what 
I'm providing to the user. I've tested it and that's how I found out what is 
the exact behavior.

*I'm not sure that an in-memory or any custom state store will make it.* Yes, 
in-memory will help with the efficient append because it avoids any expensive 
call and serializations/deserializations. Nevertheless, *you will always have 
the serializations/deserializations somewhere and this is the changelog topic 
and there you have also bandwidth* (not just precious processing time). Even if 
the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 
4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - 
O(n^2). Combined with the fact that I want to provide a library to many 
different users (and duplicates count may vary a lot 

[jira] [Commented] (KAFKA-9928) Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]

2020-04-28 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094902#comment-17094902
 ] 

Guozhang Wang commented on KAFKA-9928:
--

I found that for the failed run, around the time when the producer of 
{{produceTopicValues(streamTopic);}} around line 172 is being closed, the 
following entries are printed (whereas succeeded runs do not have those), cc 
[~mjsax]:

{code}
[2020-04-28 15:10:58,458] INFO [Consumer 
clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer,
 
groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_]
 Fetch offset 9 is out of range for partition 
stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0, resetting offset 
(org.apache.kafka.clients.consumer.internals.Fetcher:1261)
[2020-04-28 15:10:58,458] INFO [Consumer 
clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer,
 
groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_]
 Resetting offset for partition 
stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0 to offset 0. 
(org.apache.kafka.clients.consumer.internals.SubscriptionState:383)
[2020-04-28 15:10:58,459] INFO [Consumer 
clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer,
 
groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_]
 Fetch offset 9 is out of range for partition 
stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0, resetting offset 
(org.apache.kafka.clients.consumer.internals.Fetcher:1261)
[2020-04-28 15:10:58,460] INFO [Consumer 
clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer,
 
groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_]
 Resetting offset for partition 
stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0 to offset 0. 
(org.apache.kafka.clients.consumer.internals.SubscriptionState:383)
[2020-04-28 15:10:58,461] INFO [Consumer 
clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer,
 
groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_]
 Fetch offset 9 is out of range for partition 
stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0, resetting offset 
(org.apache.kafka.clients.consumer.internals.Fetcher:1261)
[2020-04-28 15:10:58,461] INFO [Consumer 
clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer,
 
groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_]
 Resetting offset for partition 
stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0 to offset 0. 
(org.apache.kafka.clients.consumer.internals.SubscriptionState:383)
[2020-04-28 15:10:58,566] INFO [Producer 
clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-producer,
 
transactionalId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-1]
 Discovered group coordinator localhost:54279 (id: 0 rack: null) 
(org.apache.kafka.clients.producer.internals.TransactionManager:1525)
[2020-04-28 15:11:00,740] INFO [Controller id=0] Processing automatic preferred 
replica leader election (kafka.controller.KafkaController:66)
{code}

Note that this CLUSTER only have one broker.

> Flaky 
> GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]
> -
>
> Key: KAFKA-9928
> URL: https://issues.apache.org/jira/browse/KAFKA-9928
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>Priority: Major
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. waiting for 
> final values
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:380)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368)
>   at 

[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-28 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094900#comment-17094900
 ] 

Georgi Petkov commented on KAFKA-9921:
--

[~ableegoldman]

Yeah, I agree that probably not much can be done in terms of caching (compared 
to the options without _retainDuplicates_).

I totally agree that many of the features like the null value behavior are 
correct and make perfect sense from point of view of the features implemented 
with it. Still, it's strange from the perspective where you use it standalone. 
*1-2 sentences clarifying the behavior with null values in the 
_WindowStateStore_ documentation could definitely help.* In addition, as I said 
if this is the desired behavior *you can easily skip calling RocksDB for null 
values (when using _retainDuplicates)_. This would make both the intention 
clearer and obviously avoid unnecessary calls.*

I do need exactly stream-stream join but without the repartition part. I want 
to get matches when there are new events in whichever stream and I also use 
_WindowStateStore_ only for the retention policy. In fact, due to the lack of 
many examples, I was looking at the stream-stream join implementation to find 
out how to correctly use the _WindowStateStores_. I'm building a library for 
some common yet not trivial at all operations on streams that you may need like 
topological sorting. Therefore I don't know if the user will provide null 
values or not. I was curious about the behavior with null values so I know what 
I'm providing to the user. I've tested it and that's how I found out what is 
the exact behavior.

*I'm not sure that an in-memory or any custom state store will make it.* Yes, 
in-memory will help with the efficient append because it avoids any expensive 
call and serializations/deserializations. Nevertheless, *you will always have 
the serializations/deserializations somewhere and this is the changelog topic 
and there you have also bandwidth* (not just precious processing time). Even if 
the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 
4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - 
O(n^2). Combined with the fact that I want to provide a library to many 
different users (and duplicates count may vary a lot between usages) *to me 
it's best to implement just as in the stream-stream join - with duplicates*. 
Still, it was a great discussion and made me more confident in my decisions. 
Thank you for your assistance.

*Regarding the PR - it adds the same code to both _WindowStoreBuilder.java_ and 
_TimestampedWindowStoreBuilder.java_ but adds a test for only one of them.*

> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for 

[GitHub] [kafka] cmccabe edited a comment on pull request #8569: KIP-551: Expose disk read and write metrics

2020-04-28 Thread GitBox


cmccabe edited a comment on pull request #8569:
URL: https://github.com/apache/kafka/pull/8569#issuecomment-620884289


   > In addition to block-level read/write, would there be a benefit to expose 
file system read/write metrics?
   
   It's better to have that discussion on the mailing list.  This PR is just 
about KIP-551.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #8569: KIP-551: Expose disk read and write metrics

2020-04-28 Thread GitBox


cmccabe commented on pull request #8569:
URL: https://github.com/apache/kafka/pull/8569#issuecomment-620884289


   > In addition to block-level read/write, would there be a benefit to expose 
file system read/write metrics?
   It's better to have that discussion on the mailing list.  This PR is just 
about KIP-551.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8542: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-28 Thread GitBox


junrao commented on pull request #8542:
URL: https://github.com/apache/kafka/pull/8542#issuecomment-620880304


   @steverod : Does the JDK 8 and Scala 2.12 tests pass for you locally? Not 
sure why the jenkins test failed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#issuecomment-620879654


   Merged to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8543: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-28 Thread GitBox


junrao commented on pull request #8543:
URL: https://github.com/apache/kafka/pull/8543#issuecomment-620879490


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416947800



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final 
EmbeddedKafkaCluster cluster, final
  * @param  Key type of the data records
  * @param  Value type of the data records
  */
-@SuppressWarnings("WeakerAccess")
 public static  void produceKeyValuesSynchronouslyWithTimestamp(final 
String topic,
  final 
Collection> records,
  final 
Properties producerConfig,
  final 
Headers headers,
  final 
Long timestamp,
- final 
boolean enableTransactions)
-throws ExecutionException, InterruptedException {
+ final 
boolean enableTransactions) {
 
 try (final Producer producer = new 
KafkaProducer<>(producerConfig)) {
 if (enableTransactions) {
 producer.initTransactions();
 producer.beginTransaction();
 }
 for (final KeyValue record : records) {
-final Future f = producer.send(
-new ProducerRecord<>(topic, null, timestamp, record.key, 
record.value, headers));
-f.get();
+producer.send(new ProducerRecord<>(topic, null, timestamp, 
record.key, record.value, headers));

Review comment:
   Thanks. That's what I was asking for confirmation on. I realize now the 
structure of my sentence was ambiguous.
   
   I agree that the method contract is that the batch should be synchronously 
produced, not that each record should be synchronously produced, so this change 
looks good to me.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416944392



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final 
EmbeddedKafkaCluster cluster, final
  * @param  Key type of the data records
  * @param  Value type of the data records
  */
-@SuppressWarnings("WeakerAccess")
 public static  void produceKeyValuesSynchronouslyWithTimestamp(final 
String topic,
  final 
Collection> records,
  final 
Properties producerConfig,
  final 
Headers headers,
  final 
Long timestamp,
- final 
boolean enableTransactions)
-throws ExecutionException, InterruptedException {
+ final 
boolean enableTransactions) {
 
 try (final Producer producer = new 
KafkaProducer<>(producerConfig)) {
 if (enableTransactions) {
 producer.initTransactions();
 producer.beginTransaction();
 }
 for (final KeyValue record : records) {
-final Future f = producer.send(
-new ProducerRecord<>(topic, null, timestamp, record.key, 
record.value, headers));
-f.get();
+producer.send(new ProducerRecord<>(topic, null, timestamp, 
record.key, record.value, headers));

Review comment:
   Previously we wait after sending each record, here we only wait once 
after sending all records, so it is more efficient.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416943907



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
##
@@ -337,8 +336,11 @@ public void 
shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws
 TestUtils.waitForCondition(
 () -> {
 try {
-final ReadOnlyKeyValueStore store =
-
kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, 
QueryableStoreTypes.keyValueStore()));
+final ReadOnlyKeyValueStore store = 
IntegrationTestUtils.getStore(STORE_NAME, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
+
+if (store == null)
+return false;

Review comment:
   ack.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416942992



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##
@@ -810,21 +808,9 @@ private void writeInputData(final List> records) throws Exc
 }
 
 private void verifyStateStore(final KafkaStreams streams,
-  final Set> 
expectedStoreContent) {
-ReadOnlyKeyValueStore store = null;
-
-final long maxWaitingTime = System.currentTimeMillis() + 30L;
-while (System.currentTimeMillis() < maxWaitingTime) {
-try {
-store = 
streams.store(StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.keyValueStore()));
-break;
-} catch (final InvalidStateStoreException okJustRetry) {
-try {
-Thread.sleep(5000L);
-} catch (final Exception ignore) { }
-}
-}
-
+  final Set> 
expectedStoreContent) throws InterruptedException {
+final ReadOnlyKeyValueStore store = IntegrationTestUtils
+.getStore(30L, storeName, streams, 
QueryableStoreTypes.keyValueStore());

Review comment:
   Ack.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] fantayeneh opened a new pull request #8577: use appropriate fn for readability. (maybe)

2020-04-28 Thread GitBox


fantayeneh opened a new pull request #8577:
URL: https://github.com/apache/kafka/pull/8577


   using the min, max might make the code a little easier to read. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9875) Flaky Test SuppressionDurabilityIntegrationTest.shouldRecoverBufferAfterShutdown[exactly_once]

2020-04-28 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-9875:
---

Assignee: John Roesler

> Flaky Test 
> SuppressionDurabilityIntegrationTest.shouldRecoverBufferAfterShutdown[exactly_once]
> --
>
> Key: KAFKA-9875
> URL: https://issues.apache.org/jira/browse/KAFKA-9875
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Assignee: John Roesler
>Priority: Major
>  Labels: flaky-test, unit-test
>
> h3. Stacktrace
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. at 
> org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:211)
>  at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteAllTopicsAndWait(EmbeddedKafkaCluster.java:300)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest(IntegrationTestUtils.java:148)
>  at 
> org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest.shouldRecoverBufferAfterShutdown(SuppressionDurabilityIntegrationTest.java:246)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416921971



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##
@@ -810,21 +808,9 @@ private void writeInputData(final List> records) throws Exc
 }
 
 private void verifyStateStore(final KafkaStreams streams,
-  final Set> 
expectedStoreContent) {
-ReadOnlyKeyValueStore store = null;
-
-final long maxWaitingTime = System.currentTimeMillis() + 30L;
-while (System.currentTimeMillis() < maxWaitingTime) {
-try {
-store = 
streams.store(StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.keyValueStore()));
-break;
-} catch (final InvalidStateStoreException okJustRetry) {
-try {
-Thread.sleep(5000L);
-} catch (final Exception ignore) { }
-}
-}
-
+  final Set> 
expectedStoreContent) throws InterruptedException {
+final ReadOnlyKeyValueStore store = IntegrationTestUtils
+.getStore(30L, storeName, streams, 
QueryableStoreTypes.keyValueStore());

Review comment:
   ```suggestion
   .getStore(300_000L, storeName, streams, 
QueryableStoreTypes.keyValueStore());
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
##
@@ -337,8 +336,11 @@ public void 
shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws
 TestUtils.waitForCondition(
 () -> {
 try {
-final ReadOnlyKeyValueStore store =
-
kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, 
QueryableStoreTypes.keyValueStore()));
+final ReadOnlyKeyValueStore store = 
IntegrationTestUtils.getStore(STORE_NAME, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
+
+if (store == null)
+return false;

Review comment:
   not a huge deal, but technically, these should have brackets.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -599,13 +595,6 @@ public static void waitForCompletion(final KafkaStreams 
streams,
 return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, 
expectedRecords, waitTime, false);
 }
 
-public static  List> 
waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig,

Review comment:
   thanks for the cleanup

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final 
EmbeddedKafkaCluster cluster, final
  * @param  Key type of the data records
  * @param  Value type of the data records
  */
-@SuppressWarnings("WeakerAccess")
 public static  void produceKeyValuesSynchronouslyWithTimestamp(final 
String topic,
  final 
Collection> records,
  final 
Properties producerConfig,
  final 
Headers headers,
  final 
Long timestamp,
- final 
boolean enableTransactions)
-throws ExecutionException, InterruptedException {
+ final 
boolean enableTransactions) {
 
 try (final Producer producer = new 
KafkaProducer<>(producerConfig)) {
 if (enableTransactions) {
 producer.initTransactions();
 producer.beginTransaction();
 }
 for (final KeyValue record : records) {
-final Future f = producer.send(
-new ProducerRecord<>(topic, null, timestamp, record.key, 
record.value, headers));
-f.get();
+producer.send(new ProducerRecord<>(topic, null, timestamp, 
record.key, record.value, headers));

Review comment:
   I guess the flush at the end makes it synchronous anyway?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9928) Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]

2020-04-28 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-9928:
--

Assignee: Matthias J. Sax

> Flaky 
> GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]
> -
>
> Key: KAFKA-9928
> URL: https://issues.apache.org/jira/browse/KAFKA-9928
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>Priority: Major
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. waiting for 
> final values
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:380)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368)
>   at 
> org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableEOSIntegrationTest.java:178)
> {code}
> I looked at the below examples:
> https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_/
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_beta__2/
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta__2/
> And also reproduced the flakiness locally after about 180 runs, and the 
> failed one did not have any obvious different traces compared with the 
> successful ones.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9928) Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]

2020-04-28 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9928:
---
Component/s: unit tests
 streams

> Flaky 
> GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]
> -
>
> Key: KAFKA-9928
> URL: https://issues.apache.org/jira/browse/KAFKA-9928
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. waiting for 
> final values
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:380)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368)
>   at 
> org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableEOSIntegrationTest.java:178)
> {code}
> I looked at the below examples:
> https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_/
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_beta__2/
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta__2/
> And also reproduced the flakiness locally after about 180 runs, and the 
> failed one did not have any obvious different traces compared with the 
> successful ones.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] fantayeneh opened a new pull request #8576: format with correct syntax

2020-04-28 Thread GitBox


fantayeneh opened a new pull request #8576:
URL: https://github.com/apache/kafka/pull/8576


   small change fix string formatting
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416916295



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##
@@ -41,132 +54,107 @@
 import static org.easymock.EasyMock.replay;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.kafka.streams.processor.TaskId;
-import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
-import org.easymock.EasyMock;
-import org.junit.Test;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 
 public class HighAvailabilityTaskAssignorTest {
-private long acceptableRecoveryLag = 100L;
-private int balanceFactor = 1;
-private int maxWarmupReplicas = 2;
-private int numStandbyReplicas = 0;
-private long probingRebalanceInterval = 60 * 1000L;
-
-private Map clientStates = new HashMap<>();
-private Set allTasks = new HashSet<>();
-private Set statefulTasks = new HashSet<>();
-
-private ClientState client1;
-private ClientState client2;
-private ClientState client3;
-
-private HighAvailabilityTaskAssignor taskAssignor;
-
-private void createTaskAssignor() {
-final AssignmentConfigs configs = new AssignmentConfigs(
-acceptableRecoveryLag,
-balanceFactor,
-maxWarmupReplicas,
-numStandbyReplicas,
-probingRebalanceInterval
-);
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-configs);
-}
+private final AssignmentConfigs configWithoutStandbys = new 
AssignmentConfigs(
+/*acceptableRecoveryLag*/ 100L,
+/*balanceFactor*/ 1,
+/*maxWarmupReplicas*/ 2,
+/*numStandbyReplicas*/ 0,
+/*probingRebalanceIntervalMs*/ 60 * 1000L
+);
+
+private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
+/*acceptableRecoveryLag*/ 100L,
+/*balanceFactor*/ 1,
+/*maxWarmupReplicas*/ 2,
+/*numStandbyReplicas*/ 1,
+/*probingRebalanceIntervalMs*/ 60 * 1000L
+);
 
-@Test
-public void 
shouldDecidePreviousAssignmentIsInvalidIfThereAreUnassignedActiveTasks() {
-client1 = EasyMock.createNiceMock(ClientState.class);
-expect(client1.prevActiveTasks()).andReturn(singleton(TASK_0_0));
-expect(client1.prevStandbyTasks()).andStubReturn(EMPTY_TASKS);
-replay(client1);
-allTasks =  mkSet(TASK_0_0, TASK_0_1);
-clientStates = singletonMap(UUID_1, client1);
-createTaskAssignor();
 
-assertFalse(taskAssignor.previousAssignmentIsValid());

Review comment:
   Since you have a follow-on PR that touches this method, I'll leave it 
alone and just proceed to merge. We should consider both of these options in 
the follow-on.
   
   Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#issuecomment-620847306


   I looked at the three failed tests:
   
   * `OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore` is 
actually due to the issue that https://github.com/apache/kafka/pull/8548 tried 
to fix. Waiting for @vvcephei to review 8548
   * 
`EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]`
 is being looked at by @mjsax as KAFKA-9831
   * 
`GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]`
 is a new issue, I created KAFKA-9928 for this, and my gut feeling is that it 
has the same root cause as KAFKA-9831. (also cc @mjsax )
   
   So I think this PR is good to be merged.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8254: KIP-557: Add Emit On Change Support

2020-04-28 Thread GitBox


vvcephei commented on pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-620847005


   Whew! System tests passed: 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-04-28--001.1588064884--ConcurrencyPractitioner--EMIT-ON-CHANGE--ddbf2cf/report.html



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-28 Thread GitBox


vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620846462







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-28 Thread GitBox


vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620846265







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416910725



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
##
@@ -158,8 +158,9 @@ public void shouldKStreamGlobalKTableLeftJoin() throws 
Exception {
 
 produceGlobalTableValues();
 
-final ReadOnlyKeyValueStore replicatedStore =
-
kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, 
QueryableStoreTypes.keyValueStore()));
+final ReadOnlyKeyValueStore replicatedStore = 
IntegrationTestUtils
+.getStore(globalStore, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
+assertNotNull(replicatedStore);

Review comment:
   Since previously we would just throw the exception with the un-wrapped 
call, here asserting it is not null is equal to make sure that the store is 
indeed returned.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9928) Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]

2020-04-28 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-9928:


 Summary: Flaky 
GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]
 Key: KAFKA-9928
 URL: https://issues.apache.org/jira/browse/KAFKA-9928
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang


{code}
Stacktrace
java.lang.AssertionError: Condition not met within timeout 3. waiting for 
final values
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:380)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368)
at 
org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableEOSIntegrationTest.java:178)
{code}

I looked at the below examples:

https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_/

https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_beta__2/

https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta__2/

And also reproduced the flakiness locally after about 180 runs, and the failed 
one did not have any obvious different traces compared with the successful ones.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-28 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9921:
---
Fix Version/s: 2.5.1

> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] gwenshap commented on pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test

2020-04-28 Thread GitBox


gwenshap commented on pull request #8518:
URL: https://github.com/apache/kafka/pull/8518#issuecomment-620826970


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-28 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094844#comment-17094844
 ] 

Sophie Blee-Goldman commented on KAFKA-9921:


I'm resolving the ticket because the PR to disable caching + duplicates and 
note this in the javadocs was just merged. If you have the chance to take a 
quick look and let me know if there's anything I missed clarifying in the docs, 
I can submit a quick followup PR or review one from you if you have something 
specific in mind

> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-28 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094839#comment-17094839
 ] 

Sophie Blee-Goldman commented on KAFKA-9921:


I take it you're using rocksdb, by the way? If you are (or can) use the 
in-memory stores then storing a list and appending should be pretty fast. On 
that note, I'm actually not sure storing the entire list would be slower than 
storing individual duplicate records even with rocskdb. I actually have a 
suspicious that it might even be faster to store as a list, assuming the number 
and size of duplicates isn't incredibly large (relative to the memtable and 
block size scale)

> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-28 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094835#comment-17094835
 ] 

Sophie Blee-Goldman commented on KAFKA-9921:


> For 2 puts I would expect 2 entries regardless if they accidentally match

Fair enough. I guess for that reason then caching and inherently incompatible, 
right?

Regarding putting _null_ values, I think the behavior with _retainDuplicates_ 
is as expected. The Streams library uses window stores with duplicates for 
stream-stream joins, for which a null value produces no output and isn't 
considered a tombstone (see [semantics of stream-stream 
joins|https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#kstream-kstream-join]
 section).

I'm starting to get a better sense of what you're trying to do here, but it 
sounds like the semantics you want might differ slightly from what Streams 
would consider a stream-stream join. Do you explicitly want a windowed join, or 
are you just using the window store because the retention policy will keep 
state from growing without bound? Does your use case require _null_ values to 
be treated as deletes?

By the way, if the built-in stores don't match your requirements exactly you 
can always plug in a custom store. You could even just wrap one of the built-in 
stores to reuse the pieces that work for you, and skip the ones that don't. The 
rocksdb WindowStore is actually just built out of segments of the rocksdb 
KeyValueStore, for example.

> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on a change in pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test

2020-04-28 Thread GitBox


abbccdda commented on a change in pull request #8518:
URL: https://github.com/apache/kafka/pull/8518#discussion_r416879514



##
File path: tests/kafkatest/tests/core/downgrade_test.py
##
@@ -67,11 +67,18 @@ def setup_services(self, kafka_version, compression_types, 
security_protocol):
  version=kafka_version)
 self.producer.start()
 
+static_membership = kafka_version == DEV_BRANCH or kafka_version >= 
LATEST_2_3

Review comment:
   I see, makes sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8204: KAFKA-9633: Ensure ConfigProviders are closed

2020-04-28 Thread GitBox


kkonstantine commented on a change in pull request #8204:
URL: https://github.com/apache/kafka/pull/8204#discussion_r416866064



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##
@@ -220,6 +220,8 @@ public void stop() {
 
 workerMetricsGroup.close();
 connectorStatusMetricsGroup.close();
+
+workerConfigTransformer.close();

Review comment:
   Looking at the initialization of `workerConfigTransformer` I see it 
should be made final. 
   
   And then I notice that this is the case for 
`connectorClientConfigOverridePolicy` and all the class members of 
`ConnectorStatusMetricsGroup`. @tombentley do you mind tightening these types 
as well?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
##
@@ -98,4 +101,8 @@ public void onCompletion(Throwable error, Void result) {
 HerderRequest request = worker.herder().restartConnector(ttl, 
connectorName, cb);
 connectorRequests.put(path, request);
 }
+
+public void close() {

Review comment:
   should we also change this class to implement `AutoCloseable`? 
   This can't be used immediately in a try-with-resources clause, but probably 
better to signal the existence of this method at the class level. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8204: KAFKA-9633: Ensure ConfigProviders are closed

2020-04-28 Thread GitBox


kkonstantine commented on pull request #8204:
URL: https://github.com/apache/kafka/pull/8204#issuecomment-620803829


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry

2020-04-28 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094805#comment-17094805
 ] 

Guozhang Wang commented on KAFKA-9925:
--

Ah yes!! Hope we can get KIP-591 by 2.6 :)

> Non-key KTable Joining may result in duplicate schema name in confluence 
> schema registry
> 
>
> Key: KAFKA-9925
> URL: https://issues.apache.org/jira/browse/KAFKA-9925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Kin Siu
>Assignee: John Roesler
>Priority: Major
>
> The second half of issue Andy Bryant reported in KAFKA-9390 looks like still 
> exist.
> When testing non-key join method without passing in "Named", I noticed that 
> there are schema subjects registered in confluent schema registry without 
> consumer group Id still, 
> e.g. 
> {noformat}
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value"
> {noformat}
> Code in KTableImpl which constructed above naming :
> https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959
> When we have multiple topologies using foreignKey join and registered to same 
> schema registry, we can have a name clash, and fail to register schema. 
> In order to clean up these schema subjects, we will need to know the internal 
> naming of a consumer group's topology, which is not straightforward and error 
> prone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7317) Use collections subscription for main consumer to reduce metadata

2020-04-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094800#comment-17094800
 ] 

Matthias J. Sax commented on KAFKA-7317:


Sweet!

> Use collections subscription for main consumer to reduce metadata
> -
>
> Key: KAFKA-7317
> URL: https://issues.apache.org/jira/browse/KAFKA-7317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.5.0
>
>
> In KAFKA-4633 we switched from "collection subscription" to "pattern 
> subscription" for `Consumer#subscribe()` to avoid triggering auto topic 
> creating on the broker. In KAFKA-5291, the metadata request was extended to 
> overwrite the broker config within the request itself. However, this feature 
> is only used in `KafkaAdminClient`. KAFKA-7320 adds this feature for the 
> consumer client, too.
> This ticket proposes to use the new feature within Kafka Streams to allow the 
> usage of collection based subscription in consumer and admit clients to 
> reduce the metadata response size than can be very large for large number of 
> partitions in the cluster.
> Note, that Streams need to be able to distinguish if it connects to older 
> brokers that do not support the new metadata request and still use pattern 
> subscription for this case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9127) Needless group coordination overhead for GlobalKTables

2020-04-28 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094801#comment-17094801
 ] 

Sophie Blee-Goldman commented on KAFKA-9127:


Yep, if you can kick off tests on that PR and give it another pass it should 
fix both issues and we can backport it to 2.5

> Needless group coordination overhead for GlobalKTables
> --
>
> Key: KAFKA-9127
> URL: https://issues.apache.org/jira/browse/KAFKA-9127
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Chris Toomey
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> When creating a simple stream topology to just populate a GlobalKTable, I 
> noticed from logging that the stream consumer was doing group coordination 
> requests (JoinGroup, SyncGroup, ...) to the server, which it had no reason to 
> do since the global consumer thread populating the table fetches from all 
> partitions and thus doesn't use the group requests. So this adds needless 
> overhead on the client, network, and server.
> I tracked this down to the stream thread consumer, which is created 
> regardless of whether it's needed based solely on NUM_STREAM_THREADS_CONFIG 
> which defaults to 1 I guess.
> I found that setting NUM_STREAM_THREADS_CONFIG to 0 will prevent this from 
> happening, but it'd be a worthwhile improvement to be able to override this 
> setting in cases of topologies like this that don't have any need for stream 
> threads. Hence this ticket.
> I originally asked about this on the users mailing list where Bruno suggested 
> I file it as an improvement request.
> Here's the Scala code that I'm using that exhibits this:
> {code:scala}
> val builder: StreamsBuilder = new StreamsBuilder()
> val gTable = builder.globalTable[K, V](...)
> val stream = new KafkaStreams(builder.build(), props)
> stream.start(){code}
>  Not shown is the state store that I'm populating/using.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry

2020-04-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094799#comment-17094799
 ] 

Matthias J. Sax commented on KAFKA-9925:


{quote}I'm wondering if now is a good time to deprecate the 
`StreamsBuilder#build()` function to let users use `build(final Properties 
props)` instead as a tiny KIP.
{quote}
Just FYI: this is already proposed in KIP-591.

However, IMHO, we should fix it for older versions, too?

> Non-key KTable Joining may result in duplicate schema name in confluence 
> schema registry
> 
>
> Key: KAFKA-9925
> URL: https://issues.apache.org/jira/browse/KAFKA-9925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Kin Siu
>Assignee: John Roesler
>Priority: Major
>
> The second half of issue Andy Bryant reported in KAFKA-9390 looks like still 
> exist.
> When testing non-key join method without passing in "Named", I noticed that 
> there are schema subjects registered in confluent schema registry without 
> consumer group Id still, 
> e.g. 
> {noformat}
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value"
> {noformat}
> Code in KTableImpl which constructed above naming :
> https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959
> When we have multiple topologies using foreignKey join and registered to same 
> schema registry, we can have a name clash, and fail to register schema. 
> In order to clean up these schema subjects, we will need to know the internal 
> naming of a consumer group's topology, which is not straightforward and error 
> prone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on pull request #8204: Ensure ConfigProviders are closed

2020-04-28 Thread GitBox


kkonstantine commented on pull request #8204:
URL: https://github.com/apache/kafka/pull/8204#issuecomment-620795402


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7317) Use collections subscription for main consumer to reduce metadata

2020-04-28 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094797#comment-17094797
 ] 

Sophie Blee-Goldman commented on KAFKA-7317:


Also fixed via [https://github.com/apache/kafka/pull/8540]

> Use collections subscription for main consumer to reduce metadata
> -
>
> Key: KAFKA-7317
> URL: https://issues.apache.org/jira/browse/KAFKA-7317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.5.0
>
>
> In KAFKA-4633 we switched from "collection subscription" to "pattern 
> subscription" for `Consumer#subscribe()` to avoid triggering auto topic 
> creating on the broker. In KAFKA-5291, the metadata request was extended to 
> overwrite the broker config within the request itself. However, this feature 
> is only used in `KafkaAdminClient`. KAFKA-7320 adds this feature for the 
> consumer client, too.
> This ticket proposes to use the new feature within Kafka Streams to allow the 
> usage of collection based subscription in consumer and admit clients to 
> reduce the metadata response size than can be very large for large number of 
> partitions in the cluster.
> Note, that Streams need to be able to distinguish if it connects to older 
> brokers that do not support the new metadata request and still use pattern 
> subscription for this case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9127) Needless group coordination overhead for GlobalKTables

2020-04-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094791#comment-17094791
 ] 

Matthias J. Sax commented on KAFKA-9127:


{quote}Does it qualify as a regression when the workaround is the same as the 
fix?
{quote}
IMHO, it does, because if you don't change any code/configs and upgrade to 2.5 
it breaks.

Btw: setting the number of threads to zero exposes a different bug: the client 
does not transit to state RUNNING

> Needless group coordination overhead for GlobalKTables
> --
>
> Key: KAFKA-9127
> URL: https://issues.apache.org/jira/browse/KAFKA-9127
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Chris Toomey
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> When creating a simple stream topology to just populate a GlobalKTable, I 
> noticed from logging that the stream consumer was doing group coordination 
> requests (JoinGroup, SyncGroup, ...) to the server, which it had no reason to 
> do since the global consumer thread populating the table fetches from all 
> partitions and thus doesn't use the group requests. So this adds needless 
> overhead on the client, network, and server.
> I tracked this down to the stream thread consumer, which is created 
> regardless of whether it's needed based solely on NUM_STREAM_THREADS_CONFIG 
> which defaults to 1 I guess.
> I found that setting NUM_STREAM_THREADS_CONFIG to 0 will prevent this from 
> happening, but it'd be a worthwhile improvement to be able to override this 
> setting in cases of topologies like this that don't have any need for stream 
> threads. Hence this ticket.
> I originally asked about this on the users mailing list where Bruno suggested 
> I file it as an improvement request.
> Here's the Scala code that I'm using that exhibits this:
> {code:scala}
> val builder: StreamsBuilder = new StreamsBuilder()
> val gTable = builder.globalTable[K, V](...)
> val stream = new KafkaStreams(builder.build(), props)
> stream.start(){code}
>  Not shown is the state store that I'm populating/using.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9127) Needless group coordination overhead for GlobalKTables

2020-04-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092826#comment-17092826
 ] 

Matthias J. Sax edited comment on KAFKA-9127 at 4/28/20, 6:54 PM:
--

[~ableegoldman] Seems we introduced a regression in 2.5.0 via KAFKA-7317 that 
would be fixed with this ticket (cf. 
[https://stackoverflow.com/questions/61342530/kafka-streams-2-5-0-requires-input-topic]).
 If you agree, we should cherry-pick the fix to 2.5 branch. And also add a 
corresponding test.

Thoughts?


was (Author: mjsax):
[~ableegoldman] Seems we introduced a regression in 2.5.0 via KAFKA-7317 that 
would be fixed with this ticket (cf. 
[https://stackoverflow.com/questions/61342530/kafka-streams-2-5-0-requires-input-topic]).
 If you agree, we should cherry-pick the fix ti 2.5 branch. And also add a 
corresponding test.

Thoughts?

> Needless group coordination overhead for GlobalKTables
> --
>
> Key: KAFKA-9127
> URL: https://issues.apache.org/jira/browse/KAFKA-9127
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Chris Toomey
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> When creating a simple stream topology to just populate a GlobalKTable, I 
> noticed from logging that the stream consumer was doing group coordination 
> requests (JoinGroup, SyncGroup, ...) to the server, which it had no reason to 
> do since the global consumer thread populating the table fetches from all 
> partitions and thus doesn't use the group requests. So this adds needless 
> overhead on the client, network, and server.
> I tracked this down to the stream thread consumer, which is created 
> regardless of whether it's needed based solely on NUM_STREAM_THREADS_CONFIG 
> which defaults to 1 I guess.
> I found that setting NUM_STREAM_THREADS_CONFIG to 0 will prevent this from 
> happening, but it'd be a worthwhile improvement to be able to override this 
> setting in cases of topologies like this that don't have any need for stream 
> threads. Hence this ticket.
> I originally asked about this on the users mailing list where Bruno suggested 
> I file it as an improvement request.
> Here's the Scala code that I'm using that exhibits this:
> {code:scala}
> val builder: StreamsBuilder = new StreamsBuilder()
> val gTable = builder.globalTable[K, V](...)
> val stream = new KafkaStreams(builder.build(), props)
> stream.start(){code}
>  Not shown is the state store that I'm populating/using.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7317) Use collections subscription for main consumer to reduce metadata

2020-04-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094781#comment-17094781
 ] 

Matthias J. Sax commented on KAFKA-7317:


As reported on SO, when setting number of threads to zero, the client state 
never goes to RUNNING. Sound like another bug?

> Use collections subscription for main consumer to reduce metadata
> -
>
> Key: KAFKA-7317
> URL: https://issues.apache.org/jira/browse/KAFKA-7317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.5.0
>
>
> In KAFKA-4633 we switched from "collection subscription" to "pattern 
> subscription" for `Consumer#subscribe()` to avoid triggering auto topic 
> creating on the broker. In KAFKA-5291, the metadata request was extended to 
> overwrite the broker config within the request itself. However, this feature 
> is only used in `KafkaAdminClient`. KAFKA-7320 adds this feature for the 
> consumer client, too.
> This ticket proposes to use the new feature within Kafka Streams to allow the 
> usage of collection based subscription in consumer and admit clients to 
> reduce the metadata response size than can be very large for large number of 
> partitions in the cluster.
> Note, that Streams need to be able to distinguish if it connects to older 
> brokers that do not support the new metadata request and still use pattern 
> subscription for this case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9916) Materialize Table-Table Join Result to Avoid Performing Same Join Twice

2020-04-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094777#comment-17094777
 ] 

Matthias J. Sax commented on KAFKA-9916:


The original example was slightly different:
{code:java}
KStream stream = ...
stream.filter((k,v) -> { v.setA("a"); return true; });
stream.filter((k,v) -> ...);{code}
For this case, the filters are not chained but executed in parallel, what 
basically is a broadcast pattern, ie, each record of `stream` is piped into 
both filters; conceptually, we would need the duplicate the input record, 
however as an optimization, we don't copy by only pass the same object twice.

> Materialize Table-Table Join Result to Avoid Performing Same Join Twice
> ---
>
> Key: KAFKA-9916
> URL: https://issues.apache.org/jira/browse/KAFKA-9916
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Priority: Major
>
> If a table-table join processor performs a join and the join needs to forward 
> downstream the old join result (e.g. due to an aggregation operation 
> downstream), it performs the same join (i.e. calls the {{ValueJoiner}}) twice.
> Given a left value {{L1}}, a right value {{R1}}, and a new right value {{R2}} 
> with the same keys and input into the join operation in this order, the join 
> processor at some point will join {{L1}} with {{R1}}. When the new right 
> value {{R2}} triggers the join, it will join {{L1}} with {{R2}} and again 
> {{L1}} with {{R1}}.
> We could avoid calling the {{ValueJoiner}} twice by materializing the join 
> result. We would trade a call to the {{ValueJoiner}} with a lookup into a 
> state store. Depending on the logic in the {{ValueJoiner}} this may or may 
> not improve the performance. However, calling the {{ValueJoiner}} once will 
> only access the input values of the {{ValueJoiner}} once, which avoids the 
> need to copy the input values each time the {{ValueJoiner}} is called. For 
> example, consider the following {{ValueJoiner}}:
> {code:java}
> private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) {
> leftValue.setSomeValue(rightValue);
> return leftValue;
> }
> {code}
> With this {{ValueJoiner}}, {{setSomeValue(rightValue)}} will be called twice 
> when {{R2}} trigger the join, the first time with {{R2}} and the second time 
> with {{R1}}. That means, {{R2}} will be overwritten by {{R1}}, which is 
> probably not what the users want. To get the correct result,  the 
> {{ValueJoiner}} should be implemented as follows:
>   
> {code:java}
> private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) {
> ComplexValue copy = copy(leftValue);
> copy.setSomeValue(rightValue);
> return copy;
> }
> {code}
> Copying values during joins could be avoided if the join result were 
> materialized. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #8574: KAFKA-9925: decorate pseudo-topics with app id

2020-04-28 Thread GitBox


guozhangwang commented on pull request #8574:
URL: https://github.com/apache/kafka/pull/8574#issuecomment-620787049


   cc @abbccdda @mjsax to take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry

2020-04-28 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094772#comment-17094772
 ] 

Guozhang Wang commented on KAFKA-9925:
--

[~vvcephei] Thanks for getting a look into this issue.

I'm wondering if now is a good time to deprecate the `StreamsBuilder#build()` 
function to let users use `build(final Properties props)` instead as a tiny 
KIP. There's risk of course that the props passed in `build` is not the same as 
the one passed into the `KafkaStreams` constructor. I think we can remember the 
reference of the Props when building the topology, and then at construction if 
we found they are not the same (by reference), we can log a warning such that 
"found the topology is built with some StreamsConfig already, which is not the 
same as the config passed in the constructor".

> Non-key KTable Joining may result in duplicate schema name in confluence 
> schema registry
> 
>
> Key: KAFKA-9925
> URL: https://issues.apache.org/jira/browse/KAFKA-9925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Kin Siu
>Assignee: John Roesler
>Priority: Major
>
> The second half of issue Andy Bryant reported in KAFKA-9390 looks like still 
> exist.
> When testing non-key join method without passing in "Named", I noticed that 
> there are schema subjects registered in confluent schema registry without 
> consumer group Id still, 
> e.g. 
> {noformat}
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value"
> {noformat}
> Code in KTableImpl which constructed above naming :
> https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959
> When we have multiple topologies using foreignKey join and registered to same 
> schema registry, we can have a name clash, and fail to register schema. 
> In order to clean up these schema subjects, we will need to know the internal 
> naming of a consumer group's topology, which is not straightforward and error 
> prone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #8564: KAFKA-9921: disable caching on stores configured to retain duplicates

2020-04-28 Thread GitBox


guozhangwang commented on pull request #8564:
URL: https://github.com/apache/kafka/pull/8564#issuecomment-620783397


   Also cherry-picked to 2.5.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


ableegoldman commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416814163



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
##
@@ -158,8 +158,9 @@ public void shouldKStreamGlobalKTableLeftJoin() throws 
Exception {
 
 produceGlobalTableValues();
 
-final ReadOnlyKeyValueStore replicatedStore =
-
kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, 
QueryableStoreTypes.keyValueStore()));
+final ReadOnlyKeyValueStore replicatedStore = 
IntegrationTestUtils
+.getStore(globalStore, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
+assertNotNull(replicatedStore);

Review comment:
   Why do we have to check for null now?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry

2020-04-28 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094729#comment-17094729
 ] 

John Roesler commented on KAFKA-9925:
-

Ok, I've opened [https://github.com/apache/kafka/pull/8574] . If you have the 
time, a review would help speed things along. Thanks for the report!

> Non-key KTable Joining may result in duplicate schema name in confluence 
> schema registry
> 
>
> Key: KAFKA-9925
> URL: https://issues.apache.org/jira/browse/KAFKA-9925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Kin Siu
>Assignee: John Roesler
>Priority: Major
>
> The second half of issue Andy Bryant reported in KAFKA-9390 looks like still 
> exist.
> When testing non-key join method without passing in "Named", I noticed that 
> there are schema subjects registered in confluent schema registry without 
> consumer group Id still, 
> e.g. 
> {noformat}
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value"
> {noformat}
> Code in KTableImpl which constructed above naming :
> https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959
> When we have multiple topologies using foreignKey join and registered to same 
> schema registry, we can have a name clash, and fail to register schema. 
> In order to clean up these schema subjects, we will need to know the internal 
> naming of a consumer group's topology, which is not straightforward and error 
> prone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-28 Thread GitBox


ableegoldman commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416810713



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##
@@ -41,132 +54,107 @@
 import static org.easymock.EasyMock.replay;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.kafka.streams.processor.TaskId;
-import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
-import org.easymock.EasyMock;
-import org.junit.Test;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 
 public class HighAvailabilityTaskAssignorTest {
-private long acceptableRecoveryLag = 100L;
-private int balanceFactor = 1;
-private int maxWarmupReplicas = 2;
-private int numStandbyReplicas = 0;
-private long probingRebalanceInterval = 60 * 1000L;
-
-private Map clientStates = new HashMap<>();
-private Set allTasks = new HashSet<>();
-private Set statefulTasks = new HashSet<>();
-
-private ClientState client1;
-private ClientState client2;
-private ClientState client3;
-
-private HighAvailabilityTaskAssignor taskAssignor;
-
-private void createTaskAssignor() {
-final AssignmentConfigs configs = new AssignmentConfigs(
-acceptableRecoveryLag,
-balanceFactor,
-maxWarmupReplicas,
-numStandbyReplicas,
-probingRebalanceInterval
-);
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-configs);
-}
+private final AssignmentConfigs configWithoutStandbys = new 
AssignmentConfigs(
+/*acceptableRecoveryLag*/ 100L,
+/*balanceFactor*/ 1,
+/*maxWarmupReplicas*/ 2,
+/*numStandbyReplicas*/ 0,
+/*probingRebalanceIntervalMs*/ 60 * 1000L
+);
+
+private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
+/*acceptableRecoveryLag*/ 100L,
+/*balanceFactor*/ 1,
+/*maxWarmupReplicas*/ 2,
+/*numStandbyReplicas*/ 1,
+/*probingRebalanceIntervalMs*/ 60 * 1000L
+);
 
-@Test
-public void 
shouldDecidePreviousAssignmentIsInvalidIfThereAreUnassignedActiveTasks() {
-client1 = EasyMock.createNiceMock(ClientState.class);
-expect(client1.prevActiveTasks()).andReturn(singleton(TASK_0_0));
-expect(client1.prevStandbyTasks()).andStubReturn(EMPTY_TASKS);
-replay(client1);
-allTasks =  mkSet(TASK_0_0, TASK_0_1);
-clientStates = singletonMap(UUID_1, client1);
-createTaskAssignor();
 
-assertFalse(taskAssignor.previousAssignmentIsValid());

Review comment:
   Or just remove it completely  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


ableegoldman commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416809469



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
##
@@ -163,8 +159,10 @@ public void shouldApplyUpdatesToStandbyStore() throws 
Exception {
 // Assert that all messages in the second batch were processed in a 
timely manner
 assertThat(semaphore.tryAcquire(batch2NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));

Review comment:
   `OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore` still 
failed on [one of the 
builds](https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/OptimizedKTableIntegrationTest/shouldApplyUpdatesToStandbyStore/)
 at this line :/
   But, at least we got farther into the test before it failed so I'd say this 
is still an improvement  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9923) Join window store duplicates can be compacted in changelog

2020-04-28 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9923:
---
Fix Version/s: 2.6.0

> Join window store duplicates can be compacted in changelog 
> ---
>
> Key: KAFKA-9923
> URL: https://issues.apache.org/jira/browse/KAFKA-9923
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Critical
> Fix For: 2.6.0
>
>
> Stream-stream joins use the regular `WindowStore` implementation but with 
> `retainDuplicates` set to true. To allow for duplicates while using the same 
> unique-key underlying stores we just wrap the key with an incrementing 
> sequence number before inserting it.
> This wrapping occurs at the innermost layer of the store hierarchy, which 
> means the duplicates must first pass through the changelogging layer. At this 
> point the keys are still identical. So, we end up sending the records to the 
> changelog without distinct keys and therefore may lose the older of the 
> duplicates during compaction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9923) Join window store duplicates can be compacted in changelog

2020-04-28 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9923:
---
Priority: Blocker  (was: Critical)

> Join window store duplicates can be compacted in changelog 
> ---
>
> Key: KAFKA-9923
> URL: https://issues.apache.org/jira/browse/KAFKA-9923
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> Stream-stream joins use the regular `WindowStore` implementation but with 
> `retainDuplicates` set to true. To allow for duplicates while using the same 
> unique-key underlying stores we just wrap the key with an incrementing 
> sequence number before inserting it.
> This wrapping occurs at the innermost layer of the store hierarchy, which 
> means the duplicates must first pass through the changelogging layer. At this 
> point the keys are still identical. So, we end up sending the records to the 
> changelog without distinct keys and therefore may lose the older of the 
> duplicates during compaction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #8562: Test compilation fixes for Scala 2.11

2020-04-28 Thread GitBox


hachikuji commented on pull request #8562:
URL: https://github.com/apache/kafka/pull/8562#issuecomment-620749977


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8562: Test compilation fixes for Scala 2.11

2020-04-28 Thread GitBox


hachikuji commented on pull request #8562:
URL: https://github.com/apache/kafka/pull/8562#issuecomment-620749686


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8562: Test compilation fixes for Scala 2.11

2020-04-28 Thread GitBox


hachikuji commented on pull request #8562:
URL: https://github.com/apache/kafka/pull/8562#issuecomment-620749823


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] pan3793 commented on pull request #7112: KAFKA-8713: JsonConverter NULL Values are replaced by default values even in NULLABLE fields

2020-04-28 Thread GitBox


pan3793 commented on pull request #7112:
URL: https://github.com/apache/kafka/pull/7112#issuecomment-620746182


   I do a new implement at https://github.com/apache/kafka/pull/8575 follow the 
[KIP-581](https://cwiki.apache.org/confluence/display/KAFKA/KIP-581:+Value+of+optional+null+field+which+has+default+value),
 this PR is deprecated, will close it soon.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] pan3793 opened a new pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null

2020-04-28 Thread GitBox


pan3793 opened a new pull request #8575:
URL: https://github.com/apache/kafka/pull/8575


   https://issues.apache.org/jira/browse/KAFKA-8713
   
   
https://cwiki.apache.org/confluence/display/KAFKA/KIP-581:+Value+of+optional+null+field+which+has+default+value
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >