[GitHub] [kafka] cnZach commented on pull request #9236: MINOR: Log warn message with details when there's kerberos login issue

2020-09-01 Thread GitBox


cnZach commented on pull request #9236:
URL: https://github.com/apache/kafka/pull/9236#issuecomment-685306709


   Hi @omkreddy or @rajinisivaram , what do you think about this? I just want 
to print out the error. It will be helpful for troubleshooting kerberos login 
issue.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6585) Consolidate duplicated logic on reset tools

2020-09-01 Thread Mani Jindal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188984#comment-17188984
 ] 

Mani Jindal commented on KAFKA-6585:


hi [~guozhang] can you please help me which is streams reset tool class is this 
core/src/main/scala/kafka/tools/StreamsResetter.java ?

> Consolidate duplicated logic on reset tools
> ---
>
> Key: KAFKA-6585
> URL: https://issues.apache.org/jira/browse/KAFKA-6585
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Mani Jindal
>Priority: Minor
>  Labels: newbie
>
> The consumer reset tool and streams reset tool today shares lot of common 
> logics such as resetting to a datetime etc. We can consolidate them into a 
> common class which directly depend on admin client at simply let these tools 
> to use the class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ocadaruma opened a new pull request #9242: MINOR: Fix misleading doc for replication quota

2020-09-01 Thread GitBox


ocadaruma opened a new pull request #9242:
URL: https://github.com/apache/kafka/pull/9242


   - Current docs for replication quota config are bit confusing because they 
contain a term "(for each topic)" though actually they work as upper bound for 
"total" replication traffic listed in 
`{leader/follower}.replication.throttled.replicas`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10457) JsonConverter.toConnectData trims BigInteger to Long for schema-less case

2020-09-01 Thread Oleksandr Diachenko (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Diachenko updated KAFKA-10457:

Summary: JsonConverter.toConnectData trims BigInteger to Long for 
schema-less case  (was: JsonConverter.toConnectData trims BigInteger to Double 
for schema-less case)

> JsonConverter.toConnectData trims BigInteger to Long for schema-less case
> -
>
> Key: KAFKA-10457
> URL: https://issues.apache.org/jira/browse/KAFKA-10457
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleksandr Diachenko
>Assignee: Oleksandr Diachenko
>Priority: Critical
> Fix For: 2.7.0
>
>
>  
> When _JsonConverter_ is configured with _schemas.enable=false_ and value, 
> exceeding _Double_ is passed, the result is incorrect since the converter 
> trims it to _Double:_
> {code:java}
> Map props = Collections.singletonMap("schemas.enable", 
> false);
> converter.configure(props, true);
> BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new 
> BigInteger("1"));
> String msg = value.toString();
> SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
> msg.getBytes());
> assertNull(schemaAndValue.schema());
> assertEquals(value, schemaAndValue.value());
> {code}
>  
>  Fails with:
>  
> {code:java}
> expected:<9223372036854775808> but was:<-9223372036854775808>
> Expected :9223372036854775808
> Actual :-9223372036854775808
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10457) JsonConverter.toConnectData trims BigInteger to Double for schema-less case

2020-09-01 Thread Oleksandr Diachenko (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Diachenko updated KAFKA-10457:

Priority: Critical  (was: Major)

> JsonConverter.toConnectData trims BigInteger to Double for schema-less case
> ---
>
> Key: KAFKA-10457
> URL: https://issues.apache.org/jira/browse/KAFKA-10457
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleksandr Diachenko
>Assignee: Oleksandr Diachenko
>Priority: Critical
> Fix For: 2.7.0
>
>
>  
> When _JsonConverter_ is configured with _schemas.enable=false_ and value, 
> exceeding _Double_ is passed, the result is incorrect since the converter 
> trims it to _Double:_
> {code:java}
> Map props = Collections.singletonMap("schemas.enable", 
> false);
> converter.configure(props, true);
> BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new 
> BigInteger("1"));
> String msg = value.toString();
> SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
> msg.getBytes());
> assertNull(schemaAndValue.schema());
> assertEquals(value, schemaAndValue.value());
> {code}
>  
>  Fails with:
>  
> {code:java}
> expected:<9223372036854775808> but was:<-9223372036854775808>
> Expected :9223372036854775808
> Actual :-9223372036854775808
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10457) JsonConverter.toConnectData trims BigInteger to Double for schema-less case

2020-09-01 Thread Oleksandr Diachenko (Jira)
Oleksandr Diachenko created KAFKA-10457:
---

 Summary: JsonConverter.toConnectData trims BigInteger to Double 
for schema-less case
 Key: KAFKA-10457
 URL: https://issues.apache.org/jira/browse/KAFKA-10457
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Oleksandr Diachenko
Assignee: Oleksandr Diachenko
 Fix For: 2.7.0


 

When _JsonConverter_ is configured with _schemas.enable=false_ and value, 
exceeding _Double_ is passed, the result is incorrect since the converter trims 
it to _Double:_
{code:java}
Map props = Collections.singletonMap("schemas.enable", false);
converter.configure(props, true);
BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new BigInteger("1"));
String msg = value.toString();
SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
assertNull(schemaAndValue.schema());
assertEquals(value, schemaAndValue.value());
{code}
 

 Fails with:

 
{code:java}
expected:<9223372036854775808> but was:<-9223372036854775808>
Expected :9223372036854775808
Actual :-9223372036854775808
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9223: KAFKA-10438 Lazy initialization of record header to reduce memory usa…

2020-09-01 Thread GitBox


chia7712 commented on pull request #9223:
URL: https://github.com/apache/kafka/pull/9223#issuecomment-685290466


   the failed build (jdk8 + scala 2.12) is related to #8955



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)

2020-09-01 Thread GitBox


chia7712 commented on pull request #8955:
URL: https://github.com/apache/kafka/pull/8955#issuecomment-685290261


   ```
   [Warn] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala:22:
 imported `Serdes' is permanently hidden by definition of object Serdes in 
package scala
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala:136:
 value stringSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala:137:
 value stringSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala:142:
 value stringSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala:142:
 value javaLongSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala:145:
 value stringSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala:145:
 value stringSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala:153:
 value stringSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala:153:
 value javaLongSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala:153:
 value stringSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala:163:
 value stringSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala:163:
 value javaLongSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala:167:
 value stringSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala:167:
 value javaLongSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Warn] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala:45:
 imported `Serdes' is permanently hidden by definition of object Serdes in 
package scala
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala:358:
 value stringSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala:358:
 value stringSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala:368:
 value stringSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala:368:
 value intSerde is not a member of object org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala:370:
 value stringSerde is not a member of object 
org.apache.kafka.streams.scala.Serdes
   [Error] 
/home/chia7712/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala:370:
 value 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-09-01 Thread GitBox


ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r481602010



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,23 +58,22 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
-// so ignore unless it is a left join

Review comment:
   It looks like we removed this comment about not ignoring it if it's a 
left join, but we didn't actually remove the code for that (yet). Which one is 
right? It seems like the comment is correct, and we shouldn't ignore the null 
key regardless of whether it is a left join. In that case, we should remove the 
`leftJoin`  part of the condition on line 78 below





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9241: Update the javadoc in GroupMetadataManager.scala

2020-09-01 Thread GitBox


showuon commented on pull request #9241:
URL: https://github.com/apache/kafka/pull/9241#issuecomment-685260150


   @guozhangwang , could you help review this small PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #9241: Update the javadoc in GroupMetadataManager.scala

2020-09-01 Thread GitBox


showuon opened a new pull request #9241:
URL: https://github.com/apache/kafka/pull/9241


   Add and update the Javadoc in `GroupMetadataManager.scala`
   1. add missing parameters
   2. rename the parameter name to be better understand and consistent ( change 
from `group` to `groupId`)
   3. fix the wrong return description
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-09-01 Thread GitBox


ableegoldman commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r481521637



##
File path: streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
##
@@ -150,13 +185,25 @@
  * @return an iterator over windowed key-value pairs {@code , 
value>}
  * @throws InvalidStateStoreException if the store is not initialized
  */
-@SuppressWarnings("deprecation") // note, this method must be kept if 
super#fetchAll(...) is removed
+// note, this method must be kept if super#fetchAll(...) is removed
+@SuppressWarnings("deprecation")
 KeyValueIterator, V> fetchAll(long timeFrom, long timeTo);
 
 @Override
-default KeyValueIterator, V> fetchAll(final Instant from, 
final Instant to) {
+default KeyValueIterator, V> fetchAll(final Instant timeFrom, 
final Instant timeTo) {
 return fetchAll(
-ApiUtils.validateMillisecondInstant(from, 
prepareMillisCheckFailMsgPrefix(from, "from")),
-ApiUtils.validateMillisecondInstant(to, 
prepareMillisCheckFailMsgPrefix(to, "to")));
+ApiUtils.validateMillisecondInstant(timeFrom, 
prepareMillisCheckFailMsgPrefix(timeFrom, "timeFrom")),
+ApiUtils.validateMillisecondInstant(timeTo, 
prepareMillisCheckFailMsgPrefix(timeTo, "timeTo")));
+}
+
+default KeyValueIterator, V> backwardFetchAll(final long 
timeFrom, final long timeTo) {

Review comment:
   Idk, the current defaults make sense to me. If a user has a custom store 
and wants to use the new `backwardFetchAll` with both longs and Instants, all 
they'd have to do is override the long-based `backwardFetchAll` method (they 
have to implement the long version no matter what, since this is what gets used 
internally to Streams). If we just throw UnsupportedOperationException directly 
from the default implementation of the Instant-based `backwardFetchAll`, then 
they would have to override that as well in their custom store. So we should 
just let the Instant default to the long method so users only have to implement 
one method instead of two (plus they would have to do the Instant validation 
themselves, etc)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-09-01 Thread GitBox


ableegoldman commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r481519774



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
##
@@ -136,34 +174,64 @@
  * 
  * This iterator must be closed after use.
  *
- * @param from  the first key in the range
- * @param tothe last key in the range
- * @param fromTime  time range start (inclusive)
- * @param toTimetime range end (inclusive)
- * @return an iterator over windowed key-value pairs {@code , 
value>}
+ * @param from the first key in the range
+ * @param to   the last key in the range
+ * @param timeFrom time range start (inclusive), where iteration starts.
+ * @param timeTo   time range end (inclusive), where iteration ends.
+ * @return an iterator over windowed key-value pairs {@code , 
value>}, from beginning to end of time.
  * @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException If {@code null} is used for any key.
- * @throws IllegalArgumentException if duration is negative or can't be 
represented as {@code long milliseconds}
+ * @throws NullPointerException   If {@code null} is used for any key.
+ * @throws IllegalArgumentException   if duration is negative or can't be 
represented as {@code long milliseconds}
  */
-KeyValueIterator, V> fetch(K from, K to, Instant fromTime, 
Instant toTime)
+KeyValueIterator, V> fetch(K from, K to, Instant timeFrom, 
Instant timeTo)
 throws IllegalArgumentException;
 
 /**
-* Gets all the key-value pairs in the existing windows.
-*
-* @return an iterator over windowed key-value pairs {@code , 
value>}
-* @throws InvalidStateStoreException if the store is not initialized
-*/
+ * Get all the key-value pairs in the given key range and time range from 
all the existing windows
+ * in backward order with respect to time (from end to beginning of time).
+ * 
+ * This iterator must be closed after use.
+ *
+ * @param from the first key in the range
+ * @param to   the last key in the range
+ * @param timeFrom time range start (inclusive), where iteration ends.
+ * @param timeTo   time range end (inclusive), where iteration starts.
+ * @return an iterator over windowed key-value pairs {@code , 
value>}, from end to beginning of time.
+ * @throws InvalidStateStoreException if the store is not initialized
+ * @throws NullPointerException   If {@code null} is used for any key.
+ * @throws IllegalArgumentException   if duration is negative or can't be 
represented as {@code long milliseconds}
+ */
+KeyValueIterator, V> backwardFetch(K from, K to, Instant 
timeFrom, Instant timeTo)

Review comment:
   Yeah, @jeqo noticed that back in the KeyValueStore PR. That's a good 
point about clarifying this point in the javadocs, not sure if it makes sense 
to do so on the ReadOnlyWindowStore methods directly vs on some IQ-related 
method/class? Maybe we can just do a quick followup PR to shore up the 
documentation here without blocking this PR





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx commented on pull request #9240: https://issues.apache.org/jira/browse/KAFKA-10456

2020-09-01 Thread GitBox


huxihx commented on pull request #9240:
URL: https://github.com/apache/kafka/pull/9240#issuecomment-685219138


   @omkreddy please review this minor change. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx opened a new pull request #9240: https://issues.apache.org/jira/browse/KAFKA-10456

2020-09-01 Thread GitBox


huxihx opened a new pull request #9240:
URL: https://github.com/apache/kafka/pull/9240


   Fix typo in description of ConsoleProducer.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-01 Thread GitBox


ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r481518121



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -118,24 +121,28 @@ public void process(final K key, final V value) {
 }
 
 final long timestamp = context().timestamp();
-//don't process records that don't fall within a full sliding 
window
-if (timestamp < windows.timeDifferenceMs()) {
-log.warn(
-"Skipping record due to early arrival. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
-value, context().topic(), context().partition(), 
context().offset()
-);
-droppedRecordsSensor.record();
-return;
+Method backwardsIterator = null;
+
+try {
+windowStore.getClass().getMethod("backwardFetch", new Class[] 
{ Object.class, Object.class, Instant.class, Instant.class });
+} catch (NoSuchMethodException | SecurityException e)  { }
+if (backwardsIterator != null) {
+processReverse(key, value, timestamp);
+} else {
+processInOrder(key, value, timestamp);

Review comment:
   1) I'm not sure I understand the usage of `backwardsIterator` here. Do 
we ever set it to anything?
   2) I think you're overcomplicating this   All you need to do is call 
`windowStore.backwardsFetch(...)` and if the underlying store doesn't support 
it, then it will throw UnsupportedOperationException. You don't need to use 
reflection/`getMethod` . Also, if we're ever in a position of catching 
SecurityException, something has probably gone wrong
   3) Originally I was thinking we should do this in `init` so we don't have to 
figure out if it's a reverse store every time a new record gets processed. But 
I just realized that all of the SessionStore fetch methods require a key, so we 
have to do this in `process` (since we don't have a key to pass in during 
`init`, and null keys aren't allowed). We can at least just do it once in the 
first `process`, and then keep track of whether we should use forwards or 
reverse iteration in subsequent ones
   
   Given the above (especially 3), there's no perfect solution, but one thing 
we can do is just keep a `reverseIterationPossible` boolean. If it's false we 
call `processInOrder`, if it's true we call `processReverse`. We also put a 
`catch UnsupportedOperationException` around the `processReverse` call, so if 
it does throw on the first invocation of `process` then we can call 
`processInOrder` and also set `reverseIterationPossible` to false so that we 
never call `processReverse` again. Does that make sense?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10456) wrong description in kafka-console-producer.sh help

2020-09-01 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-10456:
--

Assignee: huxihx

> wrong description in kafka-console-producer.sh help
> ---
>
> Key: KAFKA-10456
> URL: https://issues.apache.org/jira/browse/KAFKA-10456
> Project: Kafka
>  Issue Type: Task
>  Components: producer 
>Affects Versions: 2.6.0
> Environment: linux
>Reporter: danilo batista queiroz
>Assignee: huxihx
>Priority: Trivial
>  Labels: documentation
>   Original Estimate: 1m
>  Remaining Estimate: 1m
>
> file: core/src/main/scala/kafka/tools/ConsoleProducer.scala
> In line 151, the description of "message-send-max-retries" has a text: 
> 'retires', and the correct is 'retries'



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-01 Thread GitBox


ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r481518121



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -118,24 +121,28 @@ public void process(final K key, final V value) {
 }
 
 final long timestamp = context().timestamp();
-//don't process records that don't fall within a full sliding 
window
-if (timestamp < windows.timeDifferenceMs()) {
-log.warn(
-"Skipping record due to early arrival. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
-value, context().topic(), context().partition(), 
context().offset()
-);
-droppedRecordsSensor.record();
-return;
+Method backwardsIterator = null;
+
+try {
+windowStore.getClass().getMethod("backwardFetch", new Class[] 
{ Object.class, Object.class, Instant.class, Instant.class });
+} catch (NoSuchMethodException | SecurityException e)  { }
+if (backwardsIterator != null) {
+processReverse(key, value, timestamp);
+} else {
+processInOrder(key, value, timestamp);

Review comment:
   1) I'm not sure I understand the usage of `backwardsIterator` here. Do 
we ever set it to anything?
   2) I think you're overcomplicating this   All you need to do is call 
`windowStore.backwardsFetch(...)` and if the underlying store doesn't support 
it, then it will throw UnsupportedOperationException. You don't need to use 
reflection/`getMethod` . Also, if we're ever in a position of catching 
SecurityException, something has probably gone wrong
   3) Originally I was thinking we should do this in `init` so we don't have to 
figure out if it's a reverse store on every iteration, but I just realized that 
all of the SessionStore fetch methods require a key, so we have to do this in 
`process`. We can at least just do it once in the first `process`, and then 
keep track of whether we should use forwards or reverse iteration in subsequent 
ones
   
   Given the above (especially 3), there's no perfect solution, but one thing 
we can do is just keep a `reverseIterationPossible` boolean. If it's false we 
call `processInOrder`, if it's true we call `processReverse`. We also put a 
`catch UnsupportedOperationException` around the `processReverse` call, so if 
it does throw on the first invocation of `process` then we can call 
`processInOrder` and also set `reverseIterationPossible` to false so that we 
never call `processReverse` again. Does that make sense?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-01 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r481512388



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -118,24 +118,20 @@ public void process(final K key, final V value) {
 }
 
 final long timestamp = context().timestamp();
-//don't process records that don't fall within a full sliding 
window
+final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
+

Review comment:
   By the way, I think we should also check if the record is so old that 
even the latest window it could possibly create/affect would be dropped, and 
then not process the record at all. (ie basically check if the current record's 
right window would be dropped) We can record on the lateRecordDropSensor and 
log the message using the current record's left window. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on pull request #7877: KAFKA-9312: Wait for splitted batches to be processed after a KafkaProducer#flush()

2020-09-01 Thread GitBox


d8tltanc commented on pull request #7877:
URL: https://github.com/apache/kafka/pull/7877#issuecomment-685195411


   Hi @jonathansantilli . Are you still working on this PR? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-09-01 Thread Jerry Wei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188870#comment-17188870
 ] 

Jerry Wei commented on KAFKA-10134:
---

[~guozhang] thanks, I've tested clients changes in PR #8834 against legacy 
Kafka server, it works fine as well. thanks for your response. 

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.5.2, 2.6.1
>
> Attachments: consumer3.log.2020-08-20.log, 
> consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10455) Probing rebalances are not guaranteed to be triggered by non-leader members

2020-09-01 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188864#comment-17188864
 ] 

Sophie Blee-Goldman commented on KAFKA-10455:
-

Yeah ok, that sounds reasonable. Initially I was thinking that we should avoid 
relying on any kind of in-memory counter to enforce a change in the userdata, 
to avoid losing the counter in the event of a restart and having it get 
initialized back to the previous value. However, presumably upon rejoining 
after a restart the member would send in a new subscription, and the broker 
would save that as the latest subscription userdata, so incrementing the 
counter upon every subscription should be ok. 

I'll try to look around in the broker code to verify that it actually updates 
the member's subscription metadata, even if eg static membership is used

> Probing rebalances are not guaranteed to be triggered by non-leader members
> ---
>
> Key: KAFKA-10455
> URL: https://issues.apache.org/jira/browse/KAFKA-10455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> Apparently, if a consumer rejoins the group with the same subscription 
> userdata that it previously sent, it will not trigger a rebalance. The one 
> exception here is that the group leader will always trigger a rebalance when 
> it rejoins the group.
> This has implications for KIP-441, where we rely on asking an arbitrary 
> thread to enforce the followup probing rebalances. Technically we do ask a 
> thread living on the same instance as the leader, so the odds that the leader 
> will be chosen aren't completely abysmal, but for any multithreaded 
> application they are still at best only 50%.
> Of course in general the userdata will have changed within a span of 10 
> minutes, so the actual likelihood of hitting this is much lower –  it can 
> only happen if the member's task offset sums remained unchanged. 
> Realistically, this probably requires that the member only have 
> fully-restored active tasks (encoded with the constant sentinel -2) and that 
> no tasks be added or removed.
>  
> One solution would be to make sure the leader is responsible for the probing 
> rebalance. To do this, we would need to somehow expose the memberId of the 
> thread's main consumer to the partition assignor. I'm actually not sure if 
> that's currently possible to figure out or not. If not, we could just assign 
> the probing rebalance to every thread on the leader's instance. This 
> shouldn't result in multiple followup rebalances as the rebalance schedule 
> will be updated/reset on the first followup rebalance.
> Another solution would be to make sure the userdata is always different. We 
> could encode an extra bit that flip-flops, but then we'd have to persist the 
> latest value somewhere/somehow. Alternatively we could just encode the next 
> probing rebalance time in the subscription userdata, since that is guaranteed 
> to always be different from the previous rebalance. This might get tricky 
> though, and certainly wastes space in the subscription userdata. Also, this 
> would only solve the problem for KIP-441 probing rebalances, meaning we'd 
> have to individually ensure the userdata has changed for every type of 
> followup rebalance (see related issue below). So the first proposal, 
> requiring the leader trigger the rebalance, would be preferable.
> Note that, imho, we should just allow anyone to trigger a rebalance by 
> rejoining the group. But this would presumably require a broker-side change 
> and thus we would still need a workaround for KIP-441 to work with brokers.
>  
> Related issue:
> This also means the Streams workaround for [KAFKA-9821|http://example.com] is 
> not airtight, as we encode the followup rebalance in the member who is 
> supposed to _receive_ a revoked partition, rather than the member who is 
> actually revoking said partition. While the member doing the revoking will be 
> guaranteed to have different userdata, the member receiving the partition may 
> not. Making it the responsibility of the leader to trigger _any_ type of 
> followup rebalance would solve this issue as well.
> Note that other types of followup rebalance (version probing, static 
> membership with host info change) are guaranteed to have a change in the 
> subscription userdata, and will not hit this bug



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-09-01 Thread GitBox


cmccabe commented on pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#issuecomment-685182597


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #9208: Minor init singleton list

2020-09-01 Thread GitBox


mumrah commented on pull request #9208:
URL: https://github.com/apache/kafka/pull/9208#issuecomment-685175756


   @bbejeck update this branch with trunk and the PR builder should start 
working



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9208: Minor init singleton list

2020-09-01 Thread GitBox


bbejeck commented on pull request #9208:
URL: https://github.com/apache/kafka/pull/9208#issuecomment-685173764


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-01 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r481460413



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +190,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+//create the right 

[jira] [Created] (KAFKA-10456) wrong description in kafka-console-producer.sh help

2020-09-01 Thread danilo batista queiroz (Jira)
danilo batista queiroz created KAFKA-10456:
--

 Summary: wrong description in kafka-console-producer.sh help
 Key: KAFKA-10456
 URL: https://issues.apache.org/jira/browse/KAFKA-10456
 Project: Kafka
  Issue Type: Task
  Components: producer 
Affects Versions: 2.6.0
 Environment: linux
Reporter: danilo batista queiroz


file: core/src/main/scala/kafka/tools/ConsoleProducer.scala

In line 151, the description of "message-send-max-retries" has a text: 
'retires', and the correct is 'retries'



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah merged pull request #9238: KAFKA-10444: Configure PR builds via Jenkinsfile

2020-09-01 Thread GitBox


mumrah merged pull request #9238:
URL: https://github.com/apache/kafka/pull/9238


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)

2020-09-01 Thread GitBox


vvcephei commented on pull request #8955:
URL: https://github.com/apache/kafka/pull/8955#issuecomment-685152475


   Thanks so much for the contribution and your patience with our long 
discussion, @LMnet !



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)

2020-09-01 Thread GitBox


vvcephei merged pull request #8955:
URL: https://github.com/apache/kafka/pull/8955


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)

2020-09-01 Thread GitBox


vvcephei commented on a change in pull request #8955:
URL: https://github.com/apache/kafka/pull/8955#discussion_r481444344



##
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala
##
@@ -43,12 +43,6 @@ object Serdes extends LowPrioritySerdes {
   implicit def javaIntegerSerde: Serde[java.lang.Integer] = JSerdes.Integer()
   implicit def uuidSerde: Serde[UUID] = JSerdes.UUID()
 
-  implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): 
WindowedSerdes.TimeWindowedSerde[T] =
-new WindowedSerdes.TimeWindowedSerde[T](tSerde)
-
-  implicit def sessionWindowedSerde[T](implicit tSerde: Serde[T]): 
WindowedSerdes.SessionWindowedSerde[T] =
-new WindowedSerdes.SessionWindowedSerde[T](tSerde)

Review comment:
   Adding it back will also unblock the build, which fails right now 
because `WindowedSerdes` is an unused import. I'll just add this back and push 
to your branch, assuming the rest of the tests pass, so I can go ahead and 
merge it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)

2020-09-01 Thread GitBox


vvcephei commented on a change in pull request #8955:
URL: https://github.com/apache/kafka/pull/8955#discussion_r481441613



##
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala
##
@@ -43,12 +43,6 @@ object Serdes extends LowPrioritySerdes {
   implicit def javaIntegerSerde: Serde[java.lang.Integer] = JSerdes.Integer()
   implicit def uuidSerde: Serde[UUID] = JSerdes.UUID()
 
-  implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): 
WindowedSerdes.TimeWindowedSerde[T] =
-new WindowedSerdes.TimeWindowedSerde[T](tSerde)
-
-  implicit def sessionWindowedSerde[T](implicit tSerde: Serde[T]): 
WindowedSerdes.SessionWindowedSerde[T] =
-new WindowedSerdes.SessionWindowedSerde[T](tSerde)

Review comment:
   Ah, we don't technically need to drop this one, only the 
`timeWindowedSerde`. Sorry for the confusion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-09-01 Thread GitBox


vvcephei commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r481373305



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -446,14 +453,15 @@ boolean joinGroupIfNeeded(final Timer timer) {
 resetJoinGroupFuture();
 needsJoinPrepare = true;
 } else {
-log.info("Generation data was cleared by heartbeat thread. 
Initiating rejoin.");
+log.info("Generation data was cleared by heartbeat thread 
to {} and state is now {} before " +
+ "the rebalance callback is triggered, marking this 
rebalance as failed and retry",
+ generation, state);
 resetStateAndRejoin();

Review comment:
   Should we also reset the generation here? With the new condition above, 
we may now enter this block if generation is _not_ `NO_GENERATION`. I'm not 
sure if we want to have the generation set to some value but state set to 
`UNJOINED` and `rejoinNeeded := true`.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -446,14 +453,15 @@ boolean joinGroupIfNeeded(final Timer timer) {
 resetJoinGroupFuture();
 needsJoinPrepare = true;
 } else {
-log.info("Generation data was cleared by heartbeat thread. 
Initiating rejoin.");
+log.info("Generation data was cleared by heartbeat thread 
to {} and state is now {} before " +
+ "the rebalance callback is triggered, marking this 
rebalance as failed and retry",
+ generation, state);
 resetStateAndRejoin();
 resetJoinGroupFuture();
-return false;
 }
 } else {
 final RuntimeException exception = future.exception();
-log.info("Join group failed with {}", exception.toString());
+log.info("Rebalance failed with {}", exception.toString());

Review comment:
   Different question: can we report the exception as the "cause", rather 
than just getting the toString of it?
   ```suggestion
   log.info("Rebalance failed.", exception);
   ```

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -433,7 +440,7 @@ boolean joinGroupIfNeeded(final Timer timer) {
 generationSnapshot = this.generation;
 }
 
-if (generationSnapshot != Generation.NO_GENERATION) {
+if (generationSnapshot != Generation.NO_GENERATION && state == 
MemberState.STABLE) {

Review comment:
   Since the state can also be set from the heartbeat thread, do you think 
it would be a good idea to also get a "stateSnapshot" inside the synchronized 
block at L439 so that the state and generation are consistent wrt each other?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -652,10 +644,10 @@ public void handle(JoinGroupResponse joinResponse, 
RequestFuture fut
 } else if (error == Errors.MEMBER_ID_REQUIRED) {
 // Broker requires a concrete member id to be allowed to join 
the group. Update member id
 // and send another join group request in next cycle.
+String memberId = joinResponse.data().memberId();
+log.debug("Attempt to join group returned {} error. Will set 
the member id as {} and then rejoin", error, memberId);
 synchronized (AbstractCoordinator.this) {
-AbstractCoordinator.this.generation = new 
Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID,
-joinResponse.data().memberId(), null);
-AbstractCoordinator.this.resetStateAndRejoin();

Review comment:
   We don't need to reset the state here anymore?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
##
@@ -66,12 +73,16 @@ void sentHeartbeat(long now) {
 heartbeatInFlight = true;
 update(now);
 heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
+
+log.trace("Sending heartbeat request with {}ms remaining on timer", 
heartbeatTimer.remainingMs());

Review comment:
   Should this be inside `isTraceEnabled()` to avoid computing 
`remainingMs()` in the case that trace logging isn't on?
   
   ```suggestion
   if (log.isTraceEnabled()) {
   log.trace("Sending heartbeat request with {}ms remaining on 
timer", heartbeatTimer.remainingMs());
   }
   ```
   
   (also below)





[GitHub] [kafka] chia7712 commented on a change in pull request #9238: KAFKA-10444: Configure PR builds via Jenkinsfile

2020-09-01 Thread GitBox


chia7712 commented on a change in pull request #9238:
URL: https://github.com/apache/kafka/pull/9238#discussion_r481436430



##
File path: build.gradle
##
@@ -336,6 +337,7 @@ subprojects {
 
   task integrationTest(type: Test, dependsOn: compileJava) {
 maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors()
+ignoreFailures = userIgnoreFailures

Review comment:
   Should this new option be supported by *test task*?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-09-01 Thread GitBox


cmccabe commented on pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#issuecomment-685130207


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9238: KAFKA-10444: Configure PR builds via Jenkinsfile

2020-09-01 Thread GitBox


chia7712 commented on a change in pull request #9238:
URL: https://github.com/apache/kafka/pull/9238#discussion_r481417954



##
File path: Jenkinsfile
##
@@ -0,0 +1,169 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+def setupGradle() {
+  // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167
+  dir('.gradle') {
+deleteDir()
+  }
+  sh './gradlew -version'
+}
+
+def doValidation() {
+  sh '''
+./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala 
compileTestJava compileTestScala \
+spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \
+--profile --no-daemon --continue -PxmlSpotBugsReport=true
+  '''
+}
+
+def doTest() {
+  sh '''
+./gradlew -PscalaVersion=$SCALA_VERSION unitTest integrationTest \
+--profile --no-daemon --continue 
-PtestLoggingEvents=started,passed,skipped,failed \
+-PignoreFailures=true -PmaxParallelForks=2 -PmaxTestRetries=1 
-PmaxTestRetryFailures=5
+  '''
+  junit '**/build/test-results/**/TEST-*.xml'
+}
+
+def doStreamsArchetype() {
+  echo 'Verify that Kafka Streams archetype compiles'
+
+  sh '''
+./gradlew streams:install clients:install connect:json:install 
connect:api:install \
+ || { echo 'Could not install kafka-streams.jar (and dependencies) 
locally`'; exit 1; }
+  '''
+
+  sh '''
+version=`grep "^version=" gradle.properties | cut -d= -f 2` \
+|| { echo 'Could not get version from `gradle.properties`'; exit 1; }
+  '''
+
+  dir('streams/quickstart') {
+sh '''
+  mvn clean install -Dgpg.skip  \
+  || { echo 'Could not `mvn install` streams quickstart archetype'; 
exit 1; }
+'''
+
+sh '''
+  mkdir test-streams-archetype && cd test-streams-archetype \
+  || { echo 'Could not create test directory for stream quickstart 
archetype'; exit 1; }
+'''
+
+sh '''
+  echo "Y" | mvn archetype:generate \
+  -DarchetypeCatalog=local \
+  -DarchetypeGroupId=org.apache.kafka \
+  -DarchetypeArtifactId=streams-quickstart-java \
+  -DarchetypeVersion=$version \
+  -DgroupId=streams.examples \
+  -DartifactId=streams.examples \
+  -Dversion=0.1 \
+  -Dpackage=myapps \
+  || { echo 'Could not create new project using streams quickstart 
archetype'; exit 1; }
+'''
+
+dir('streams.examples') {
+  sh '''
+mvn compile \
+|| { echo 'Could not compile streams quickstart archetype 
project'; exit 1; }
+  '''
+}
+  }
+}
+
+def tryStreamsArchetype() {
+  try {
+doStreamsArchetype()
+  } catch(err) {
+echo 'Failed to build Kafka Streams archetype, marking this build UNSTABLE'
+currentBuild.result = 'UNSTABLE'
+  }
+}
+
+
+pipeline {
+  agent none
+  stages {
+stage('Build') {
+  parallel {
+stage('JDK 8') {
+  agent { label 'ubuntu' }
+  tools {
+jdk 'JDK 1.8 (latest)'
+maven 'Maven 3.6.3'
+  }
+  options {
+timeout(time: 8, unit: 'HOURS') 
+timestamps()
+  }
+  environment {
+SCALA_VERSION=2.12
+  }
+  steps {
+setupGradle()
+doValidation()
+doTest()
+tryStreamsArchetype()
+  }
+}
+
+stage('JDK 11') {
+  agent { label 'ubuntu' }
+  tools {
+jdk 'JDK 11 (latest)'
+  }
+  options {
+timeout(time: 8, unit: 'HOURS') 
+timestamps()
+  }
+  environment {
+SCALA_VERSION=2.13
+  }
+  steps {
+setupGradle()
+doValidation()
+doTest()
+echo 'Skipping Kafka Streams archetype test for Java 11'
+  }
+}
+   
+stage('JDK 15') {
+  agent { label 'ubuntu' }
+  tools {
+jdk 'JDK 15 (latest)'
+  }
+  options {
+timeout(time: 8, unit: 'HOURS') 
+timestamps()
+  }
+  environment {
+SCALA_VERSION=2.13
+  }
+  steps {
+

[jira] [Commented] (KAFKA-10455) Probing rebalances are not guaranteed to be triggered by non-leader members

2020-09-01 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188804#comment-17188804
 ] 

John Roesler commented on KAFKA-10455:
--

Thanks, [~ableegoldman] . This is a bummer.

It seems like when we want to force a rebalance, we just have to make sure the 
user-data changes in some way. IIUC, each consumer gets a PartitionAssignor 
instance with the same lifecycle as the consumer itself. Therefore, we can just 
initialize a single `byte`, which we'll just slap onto the subscription 
userdata and ignore when deserializing. We can just increment it for each 
forced rebalance and let it roll over.

> Probing rebalances are not guaranteed to be triggered by non-leader members
> ---
>
> Key: KAFKA-10455
> URL: https://issues.apache.org/jira/browse/KAFKA-10455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> Apparently, if a consumer rejoins the group with the same subscription 
> userdata that it previously sent, it will not trigger a rebalance. The one 
> exception here is that the group leader will always trigger a rebalance when 
> it rejoins the group.
> This has implications for KIP-441, where we rely on asking an arbitrary 
> thread to enforce the followup probing rebalances. Technically we do ask a 
> thread living on the same instance as the leader, so the odds that the leader 
> will be chosen aren't completely abysmal, but for any multithreaded 
> application they are still at best only 50%.
> Of course in general the userdata will have changed within a span of 10 
> minutes, so the actual likelihood of hitting this is much lower –  it can 
> only happen if the member's task offset sums remained unchanged. 
> Realistically, this probably requires that the member only have 
> fully-restored active tasks (encoded with the constant sentinel -2) and that 
> no tasks be added or removed.
>  
> One solution would be to make sure the leader is responsible for the probing 
> rebalance. To do this, we would need to somehow expose the memberId of the 
> thread's main consumer to the partition assignor. I'm actually not sure if 
> that's currently possible to figure out or not. If not, we could just assign 
> the probing rebalance to every thread on the leader's instance. This 
> shouldn't result in multiple followup rebalances as the rebalance schedule 
> will be updated/reset on the first followup rebalance.
> Another solution would be to make sure the userdata is always different. We 
> could encode an extra bit that flip-flops, but then we'd have to persist the 
> latest value somewhere/somehow. Alternatively we could just encode the next 
> probing rebalance time in the subscription userdata, since that is guaranteed 
> to always be different from the previous rebalance. This might get tricky 
> though, and certainly wastes space in the subscription userdata. Also, this 
> would only solve the problem for KIP-441 probing rebalances, meaning we'd 
> have to individually ensure the userdata has changed for every type of 
> followup rebalance (see related issue below). So the first proposal, 
> requiring the leader trigger the rebalance, would be preferable.
> Note that, imho, we should just allow anyone to trigger a rebalance by 
> rejoining the group. But this would presumably require a broker-side change 
> and thus we would still need a workaround for KIP-441 to work with brokers.
>  
> Related issue:
> This also means the Streams workaround for [KAFKA-9821|http://example.com] is 
> not airtight, as we encode the followup rebalance in the member who is 
> supposed to _receive_ a revoked partition, rather than the member who is 
> actually revoking said partition. While the member doing the revoking will be 
> guaranteed to have different userdata, the member receiving the partition may 
> not. Making it the responsibility of the leader to trigger _any_ type of 
> followup rebalance would solve this issue as well.
> Note that other types of followup rebalance (version probing, static 
> membership with host info change) are guaranteed to have a change in the 
> subscription userdata, and will not hit this bug



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-01 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r481391993



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+}
+}
+
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+//window from [0,timeDifference] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (next.value.timestamp() < timestamp) {
+previousRecordTimestamp = next.value.timestamp();
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+if (combinedWindow == null) {
+final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+} else {
+//create the right window for the previous record if the 
previous record exists and the window hasn't already been created
+

[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-01 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r481378201



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+}
+}
+
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+//window from [0,timeDifference] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (next.value.timestamp() < timestamp) {
+previousRecordTimestamp = next.value.timestamp();
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+if (combinedWindow == null) {
+final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+} else {
+//create the right window for the previous record if the 
previous record exists and the window hasn't already been created
+

[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-01 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r481375834



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -164,11 +161,13 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
-if (isLeftWindow(next)) {
-latestLeftTypeWindow = next.key.window();
-}
+// update to store the previous record

Review comment:
   I'm not sure we need to check that, I think that by the nature of going 
through windows forward, the next window we find will always have a max 
timestamp that's larger than the previous window. Right? Lemme do an example:
   Record comes in @30, previous record was at @23, timeDifference = 10. The 
last window we find with an endTime < timestamp will be 23's left window, where 
the max record value is 23. Any earlier windows with endTime < timestamp will 
have a max value less than 23, so we can override them safely





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-01 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r481338030



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -164,11 +161,13 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
-if (isLeftWindow(next)) {
-latestLeftTypeWindow = next.key.window();
-}
+// update to store the previous record

Review comment:
   This comment doesn't really add anything, it just describes what the 
code says. Also, don't we need to check that   `windowMaxTimestamp > 
previousRecordTimestamp` before updating `previousRecordTimestamp` (where 
`windowMaxTimestamp = next.value.timestamp` -- it would be nice to assign this 
to a variable with an explicit name to make it clear what 
`next.value.timestamp` actually means).
   Same goes for the below, I guess you could just put the check in a 
`maybeUpdatePreviousRecordTimestamp()` method and call it from both places

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -146,13 +142,14 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean leftWinAlreadyCreated = false;
 boolean rightWinAlreadyCreated = false;
 
-// keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+// Store the previous record
+Long previousRecord = null;

Review comment:
   Use `previousRecordTimestamp` like in `processEarly`.  You can probably 
remove the comment then

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+}
+}
+
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+//window from [0,timeDifference] that holds all early records

Review comment:
   ```suggestion
   // A window from [0, timeDifferenceMs] that holds all early 
records
   ```
   Also I'd suggest putting the `combinedWindow` declaration (and comment) 
above `rightWinAgg` to avoid ambiguity in what the comment refers to 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window 

[jira] [Assigned] (KAFKA-6579) Consolidate window store and session store unit tests into a single class

2020-09-01 Thread Sharath Bhat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sharath Bhat reassigned KAFKA-6579:
---

Assignee: (was: Sharath Bhat)

> Consolidate window store and session store unit tests into a single class
> -
>
> Key: KAFKA-6579
> URL: https://issues.apache.org/jira/browse/KAFKA-6579
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie, unit-test
>
> For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared 
> among all its implementations; however for window and session stores, each 
> class has its own independent unit test classes that do not share the test 
> coverage. In fact, many of these test classes share the same unit test 
> functions (e.g. {{RocksDBWindowStoreTest}}, 
> {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}).
> It is better to use the same pattern as for key value stores to consolidate 
> these test functions into a shared base class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10455) Probing rebalances are not guaranteed to be triggered by non-leader members

2020-09-01 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10455:
---

 Summary: Probing rebalances are not guaranteed to be triggered by 
non-leader members
 Key: KAFKA-10455
 URL: https://issues.apache.org/jira/browse/KAFKA-10455
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.0
Reporter: Sophie Blee-Goldman


Apparently, if a consumer rejoins the group with the same subscription userdata 
that it previously sent, it will not trigger a rebalance. The one exception 
here is that the group leader will always trigger a rebalance when it rejoins 
the group.

This has implications for KIP-441, where we rely on asking an arbitrary thread 
to enforce the followup probing rebalances. Technically we do ask a thread 
living on the same instance as the leader, so the odds that the leader will be 
chosen aren't completely abysmal, but for any multithreaded application they 
are still at best only 50%.

Of course in general the userdata will have changed within a span of 10 
minutes, so the actual likelihood of hitting this is much lower –  it can only 
happen if the member's task offset sums remained unchanged. Realistically, this 
probably requires that the member only have fully-restored active tasks 
(encoded with the constant sentinel -2) and that no tasks be added or removed.

 

One solution would be to make sure the leader is responsible for the probing 
rebalance. To do this, we would need to somehow expose the memberId of the 
thread's main consumer to the partition assignor. I'm actually not sure if 
that's currently possible to figure out or not. If not, we could just assign 
the probing rebalance to every thread on the leader's instance. This shouldn't 
result in multiple followup rebalances as the rebalance schedule will be 
updated/reset on the first followup rebalance.

Another solution would be to make sure the userdata is always different. We 
could encode an extra bit that flip-flops, but then we'd have to persist the 
latest value somewhere/somehow. Alternatively we could just encode the next 
probing rebalance time in the subscription userdata, since that is guaranteed 
to always be different from the previous rebalance. This might get tricky 
though, and certainly wastes space in the subscription userdata. Also, this 
would only solve the problem for KIP-441 probing rebalances, meaning we'd have 
to individually ensure the userdata has changed for every type of followup 
rebalance (see related issue below). So the first proposal, requiring the 
leader trigger the rebalance, would be preferable.

Note that, imho, we should just allow anyone to trigger a rebalance by 
rejoining the group. But this would presumably require a broker-side change and 
thus we would still need a workaround for KIP-441 to work with brokers.

 

Related issue:
This also means the Streams workaround for [KAFKA-9821|http://example.com] is 
not airtight, as we encode the followup rebalance in the member who is supposed 
to _receive_ a revoked partition, rather than the member who is actually 
revoking said partition. While the member doing the revoking will be guaranteed 
to have different userdata, the member receiving the partition may not. Making 
it the responsibility of the leader to trigger _any_ type of followup rebalance 
would solve this issue as well.

Note that other types of followup rebalance (version probing, static membership 
with host info change) are guaranteed to have a change in the subscription 
userdata, and will not hit this bug



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-4279) REST endpoint to list converter plugins

2020-09-01 Thread Rupesh Kumar Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rupesh Kumar Patel reassigned KAFKA-4279:
-

Assignee: Rupesh Kumar Patel

> REST endpoint to list converter plugins
> ---
>
> Key: KAFKA-4279
> URL: https://issues.apache.org/jira/browse/KAFKA-4279
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gwen Shapira
>Assignee: Rupesh Kumar Patel
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We have a REST resource that allows users to see the available plugins, but 
> we have no equivalent that allows listing available converters.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9440) Add ConsumerGroupCommand to delete static members

2020-09-01 Thread Sandeep Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sandeep Kumar reassigned KAFKA-9440:


Assignee: Sandeep Kumar  (was: Mani Jindal)

> Add ConsumerGroupCommand to delete static members
> -
>
> Key: KAFKA-9440
> URL: https://issues.apache.org/jira/browse/KAFKA-9440
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Sandeep Kumar
>Priority: Major
>  Labels: help-wanted, kip, newbie, newbie++
>
> We introduced a new AdminClient API removeMembersFromConsumerGroup in 2.4. It 
> would be good to instantiate the API as part of the ConsumerGroupCommand for 
> easy command line usage. 
> This change requires a new KIP, and just posting out here in case anyone who 
> uses static membership to pick it up, if they would like to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lct45 opened a new pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-01 Thread GitBox


lct45 opened a new pull request #9239:
URL: https://github.com/apache/kafka/pull/9239


   Adding a `backwardFetch` call to the window store for sliding windows 
processing. While the implementation works with the forward call to the window 
store, using `backwardFetch` allows for the iterator to be closed earlier, 
making implementation more efficient.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #8918: Use debug level logging for noisy log messages in Connect

2020-09-01 Thread GitBox


rhauch commented on a change in pull request #8918:
URL: https://github.com/apache/kafka/pull/8918#discussion_r481251133



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -475,7 +475,7 @@ private synchronized void recordSent(final 
ProducerRecord record
 public boolean commitOffsets() {
 long commitTimeoutMs = 
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
 
-log.info("{} Committing offsets", this);
+log.trace("{} Committing offsets", this);

Review comment:
   Why set this to `trace` rather than `debug` like the other changes? 
Isn't it useful to know that we've entered this method before we lock?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -571,7 +571,7 @@ public boolean commitOffsets() {
 finishSuccessfulFlush();
 long durationMillis = time.milliseconds() - started;
 recordCommitSuccess(durationMillis);
-log.info("{} Finished commitOffsets successfully in {} ms",
+log.trace("{} Finished commitOffsets successfully in {} ms",

Review comment:
   Same question here about `trace` vs `debug`, as denoted above. With 
debug logs, isn't it useful to know how long it took to commit all offsets?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-6579) Consolidate window store and session store unit tests into a single class

2020-09-01 Thread Sharath Bhat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sharath Bhat reassigned KAFKA-6579:
---

Assignee: Sharath Bhat

> Consolidate window store and session store unit tests into a single class
> -
>
> Key: KAFKA-6579
> URL: https://issues.apache.org/jira/browse/KAFKA-6579
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sharath Bhat
>Priority: Major
>  Labels: newbie, unit-test
>
> For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared 
> among all its implementations; however for window and session stores, each 
> class has its own independent unit test classes that do not share the test 
> coverage. In fact, many of these test classes share the same unit test 
> functions (e.g. {{RocksDBWindowStoreTest}}, 
> {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}).
> It is better to use the same pattern as for key value stores to consolidate 
> these test functions into a shared base class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on pull request #9226: KAFKA-10444: Jenkinsfile testing

2020-09-01 Thread GitBox


mumrah commented on pull request #9226:
URL: https://github.com/apache/kafka/pull/9226#issuecomment-684931052


   Closing in favor of #9238 for a clean slate



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah opened a new pull request #9238: KAFKA-10444 Add Jenkinsfile

2020-09-01 Thread GitBox


mumrah opened a new pull request #9238:
URL: https://github.com/apache/kafka/pull/9238


   This PR adds a Jenkinsfile to the build to replace the existing jenkins.sh 
script.
   
   The build makes use of the parallel directive to run the JDK 8, 11, and 15 
builds in parallel. Each build will report status to PRs in Github including 
which tests failed.
   
   Only users with write access to the repository (i.e., committers) will be 
able to modify Jenkinsfile through pull requests, otherwise the build will use 
the Jenkinsfile on the target branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah closed pull request #9226: KAFKA-10444: Jenkinsfile testing

2020-09-01 Thread GitBox


mumrah closed pull request #9226:
URL: https://github.com/apache/kafka/pull/9226


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10454) Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join partitions don't match

2020-09-01 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188534#comment-17188534
 ] 

John Roesler commented on KAFKA-10454:
--

Huh, thanks for the report and repro, [~lkokhreidze] !

I think that if we didn't do selectKey in there, then you _would_ get an 
exception, since we would check that the stream and table have the same number 
of partitions for the join. I'm guessing that the selectKey is inserting a 
repartition node, which is (correctly) configured to be co-partitioned with the 
table, but the source-changelog optimization is kicking in and ignoring that 
the repartition node and source topic have different numbers of partitions.

If that sounds right to you, then the solution should be to add a 
partition-count check before applying the source-changelog optimization. Your 
workaround looks like it works also to disable the source-changelog 
optimization because the repartition() operator always forces a repartition 
node.

Thanks!

-John

> Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join 
> partitions don't match
> -
>
> Key: KAFKA-10454
> URL: https://issues.apache.org/jira/browse/KAFKA-10454
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Levani Kokhreidze
>Priority: Major
>
> Here's integration test: [https://github.com/apache/kafka/pull/9237]
>  
> From the first glance, issue is that when one joins stream to table, and 
> table source topic doesn't have same number of partitions as stream topic, 
> `StateChangelogReader` tries to recover state from changelog (which in this 
> case is the same as source topic) for table from partitions that don't exist. 
> Logs are spammed with: 
>  
> {code:java}
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO 

[GitHub] [kafka] mumrah commented on a change in pull request #9226: KAFKA-10444: Jenkinsfile testing

2020-09-01 Thread GitBox


mumrah commented on a change in pull request #9226:
URL: https://github.com/apache/kafka/pull/9226#discussion_r481202138



##
File path: Jenkinsfile
##
@@ -0,0 +1,200 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+def setupGradle() {
+  // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167
+  dir('.gradle') {
+deleteDir()
+  }
+  sh './gradlew -version'
+}
+
+def doValidation() {
+  try {
+sh '''
+  ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala 
compileTestJava compileTestScala \
+  spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \
+  --profile --no-daemon --continue -PxmlSpotBugsReport=true \"$@\"
+'''
+  } catch(err) {
+error('Validation checks failed, aborting this build')
+  }
+}
+
+def doTest() {
+  try {
+sh '''
+  ./gradlew -PscalaVersion=$SCALA_VERSION unitTest integrationTest \
+  --profile --no-daemon --continue 
-PtestLoggingEvents=started,passed,skipped,failed "$@"
+'''
+  } catch(err) {
+echo 'Some tests failed, marking this build UNSTABLE'
+currentBuild.result = 'UNSTABLE'

Review comment:
   Moving the junit into the stage seemed to work





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10448) Preserve Source Partition in Kafka Streams from context

2020-09-01 Thread satya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188447#comment-17188447
 ] 

satya commented on KAFKA-10448:
---

[~mjsax] I would like to submit a request for KIP as you suggested. Need some 
guidance please. I have requested for access . I am newbie over here, but would 
like to contribute in anyway possible

> Preserve Source Partition in Kafka Streams from context
> ---
>
> Key: KAFKA-10448
> URL: https://issues.apache.org/jira/browse/KAFKA-10448
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: satya
>Priority: Minor
>  Labels: needs-kip
>
> Currently Kafka streams Sink Nodes use default partitioner or has the 
> provision of using a custom partitioner which has to be dependent on 
> key/value. I am looking for an enhancement of Sink Node to ensure source 
> partition is preserved instead of deriving the partition again using 
> key/value. One of our use case has producers which have custom partitioners 
> that we dont have access to as it is a third-party application. By simply 
> preserving the partition through context.partition() would be helpful.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-01 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r480997443



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -203,6 +203,10 @@ public String queryableStoreName() {
 processorSupplier,
 tableNode,
 builder);
+
+kTable.enableSendingOldValues();

Review comment:
   Switching to enabling via the filter.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10072) Kafkaconsumer is configured with different clientid parameters to obtain different results

2020-09-01 Thread victor (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188388#comment-17188388
 ] 

victor commented on KAFKA-10072:


Hey [~akumar] 
1. Limit the topic current to 1024B based on client.id=aa
2. Use client.id=aa to always produce and consume data to this topic
3. Close all brokers
This client.id=aa cannot consume data from this topic, even if the broker is 
restarted

> Kafkaconsumer is configured with different clientid parameters to obtain 
> different results
> --
>
> Key: KAFKA-10072
> URL: https://issues.apache.org/jira/browse/KAFKA-10072
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.0
> Environment: centos7.6 8C 32G
>Reporter: victor
>Assignee: Ankit Kumar
>Priority: Blocker
>
> kafka-console-consumer.sh --bootstrap-server host1:port --consumer-property 
> {color:#DE350B}client.id=aa{color} --from-beginning --topic topicA
> {color:#DE350B}There's no data
> {color}
> kafka-console-consumer.sh --bootstrap-server host1:port --consumer-property 
> {color:#DE350B}clientid=bb{color} --from-beginning --topic topicA
> {color:#DE350B}Successfully consume data{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-2939) Make AbstractConfig.logUnused() tunable for clients

2020-09-01 Thread Mani Jindal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mani Jindal reassigned KAFKA-2939:
--

Assignee: Mani Jindal

> Make AbstractConfig.logUnused() tunable for clients
> ---
>
> Key: KAFKA-2939
> URL: https://issues.apache.org/jira/browse/KAFKA-2939
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Reporter: Guozhang Wang
>Assignee: Mani Jindal
>Priority: Major
>  Labels: newbie
>
> Today we always log unused configs in KafkaProducer / KafkaConsumer in their 
> constructors, however for some cases like Kafka Streams that make use of 
> these clients, other configs may be passed in to configure Partitioner / 
> Serializer classes, etc. So it would be better to make this function call 
> optional to avoid printing unnecessary and confusing WARN entries.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-2865) Improve Request API Error Code Documention

2020-09-01 Thread Mani Jindal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mani Jindal reassigned KAFKA-2865:
--

Assignee: Mani Jindal

> Improve Request API Error Code Documention
> --
>
> Key: KAFKA-2865
> URL: https://issues.apache.org/jira/browse/KAFKA-2865
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Mani Jindal
>Priority: Major
>  Labels: newbie
>
> Current protocol documentation 
> (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes)
>   contains a list of all the error codes possible through the Kafka request 
> API, but this is getting unwieldy to manage since error codes span different 
> request types and occasionally have slightly different semantics. It would be 
> nice to list the error codes for each API separately with request-specific 
> descriptions as well as suggested handling (when it makes sense). 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-6585) Consolidate duplicated logic on reset tools

2020-09-01 Thread Mani Jindal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mani Jindal reassigned KAFKA-6585:
--

Assignee: Mani Jindal

> Consolidate duplicated logic on reset tools
> ---
>
> Key: KAFKA-6585
> URL: https://issues.apache.org/jira/browse/KAFKA-6585
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Mani Jindal
>Priority: Minor
>  Labels: newbie
>
> The consumer reset tool and streams reset tool today shares lot of common 
> logics such as resetting to a datetime etc. We can consolidate them into a 
> common class which directly depend on admin client at simply let these tools 
> to use the class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9440) Add ConsumerGroupCommand to delete static members

2020-09-01 Thread Mani Jindal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mani Jindal reassigned KAFKA-9440:
--

Assignee: Mani Jindal

> Add ConsumerGroupCommand to delete static members
> -
>
> Key: KAFKA-9440
> URL: https://issues.apache.org/jira/browse/KAFKA-9440
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Mani Jindal
>Priority: Major
>  Labels: help-wanted, kip, newbie, newbie++
>
> We introduced a new AdminClient API removeMembersFromConsumerGroup in 2.4. It 
> would be good to instantiate the API as part of the ConsumerGroupCommand for 
> easy command line usage. 
> This change requires a new KIP, and just posting out here in case anyone who 
> uses static membership to pick it up, if they would like to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-7327) The CPU memory of the kafak master node continues to soar, does not recycle, and finally the service fails?

2020-09-01 Thread Mani Jindal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mani Jindal reassigned KAFKA-7327:
--

Assignee: (was: Mani Jindal)

> The CPU memory of the kafak master node continues to soar, does not recycle, 
> and finally the service fails?
> ---
>
> Key: KAFKA-7327
> URL: https://issues.apache.org/jira/browse/KAFKA-7327
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.1.0
> Environment: linux centos7 
>Reporter: ruiliang
>Priority: Blocker
> Attachments: server.log.2018-08-23-10
>
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> Xmlipcregsvc-> 172.18.58.184:60686 (CLOSE_WAIT) has many such ports to close 
> waiting, which is the application connection side.Why wait?Memory nor 
> recycling three services are 2 nuclear 4 gb of memory, this before is 
> kafka3G, found that memory, the heap memory, and then I will limit kfaka up 
> to 2 g, but the master node to run after a period of time, and submitted to 
> the heap memory and heap memory leak, I free -m looked at it and really have 
> 100 MB of memory, I don't know where memory use, kafka made up 80% of the 
> process of memory, CPU by more than 100%, what reason is this?The 
> configuration parameters have been checked with the official website. The 
> default is not acceptable.
> XmlIpcRegSvc->172.18.58.184:60686 (CLOSE_WAIT) 
> 有很多这个样的端口关闭等待,这是应用连接端。为什么一直等待呢?内存也没有回收 我3台服务是 2核 4G 
> 内存,这之前给的是kafka3G,发现内存没了,报堆外内存溢出,然后我就限制kfaka最大为2G,但主节点跑一段时间后,又报堆内存溢出和堆外内存溢出,我free
>  -m看了一下,内存确实还有100MB了,不知内存用在那里,kafka 这个进程暂用完了 80%的内存,cpu 
> 100%多了,这是什么原因呢?配置参数和官网核对了一下,全用默认的也不行,
> ` 1772 liandong 20 0 6398984 2.146g 16112 S 101.3 58.0 93:59.72 
> /usr/local/jdk1.8/bin/java -Xmx2G -Xms1G -server -XX:+UseG1GC 
> -XX:+HeapDumpOnOutOfMemoryError -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent 
> -Djava.awt.headless=true -XX:MaxDirectMemorySize=512m 
> -Xloggc:/data/kafka/bin/../logs/kafkaSer+...`
> kafka server.log log
> `[2018-08-23 07:56:11,788] INFO [GroupCoordinator 0]: Stabilized group 
> consumer.web.log generation 268 (__consumer_offsets-24) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-08-23 07:56:12,054] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.OutOfMemoryError: Java heap space
> [2018-08-23 07:56:13,846] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.OutOfMemoryError: Java heap space
> [2018-08-23 07:56:15,673] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.OutOfMemoryError: Direct buffer memory
>  at java.nio.Bits.reserveMemory(Bits.java:694)
>  at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
>  at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>  at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
>  at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>  at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>  at 
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104)
>  at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:145)
>  at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
>  at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235)
>  at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196)
>  at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:557)
>  at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:495)
>  at org.apache.kafka.common.network.Selector.poll(Selector.java:424)
>  at kafka.network.Processor.poll(SocketServer.scala:628)
>  at kafka.network.Processor.run(SocketServer.scala:545)
>  at java.lang.Thread.run(Thread.java:748)
> [2018-08-23 07:56:16,379] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.OutOfMemoryError: Java heap space`
> 172.18.58.184:speedtrace (CLOSE_WAIT) 172.18.58.184 是 kafka client connect
> lsof -i | grep java 
> `java 1772 liandong 83u IPv4 7990697 0t0 TCP *:36145 (LISTEN)
> java 1772 liandong 84u IPv4 7990698 0t0 TCP *:9099 (LISTEN)
> java 1772 liandong 85u IPv4 7990701 0t0 TCP *:40745 (LISTEN)
> java 1772 liandong 100u IPv4 7990709 0t0 TCP 
> prod_data_kafka_2:44688->prod_data_zk:eforward (ESTABLISHED)
> java 1772 liandong 193u IPv4 7989816 0t0 TCP prod_data_kafka_2:XmlIpcRegSvc 
> (LISTEN)
> java 1772 liandong 224u IPv4 8019955 0t0 TCP 
> prod_data_kafka_2:9099->172.18.58.184:47430 (ESTABLISHED)
> java 1772 liandong 228u IPv4 8018733 0t0 TCP 
> prod_data_kafka_2:XmlIpcRegSvc->172.18.58.184:33032 (CLOSE_WAIT)
> java 1772 liandong 229u IPv4 

[jira] [Assigned] (KAFKA-7327) The CPU memory of the kafak master node continues to soar, does not recycle, and finally the service fails?

2020-09-01 Thread Mani Jindal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mani Jindal reassigned KAFKA-7327:
--

Assignee: Mani Jindal

> The CPU memory of the kafak master node continues to soar, does not recycle, 
> and finally the service fails?
> ---
>
> Key: KAFKA-7327
> URL: https://issues.apache.org/jira/browse/KAFKA-7327
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.1.0
> Environment: linux centos7 
>Reporter: ruiliang
>Assignee: Mani Jindal
>Priority: Blocker
> Attachments: server.log.2018-08-23-10
>
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> Xmlipcregsvc-> 172.18.58.184:60686 (CLOSE_WAIT) has many such ports to close 
> waiting, which is the application connection side.Why wait?Memory nor 
> recycling three services are 2 nuclear 4 gb of memory, this before is 
> kafka3G, found that memory, the heap memory, and then I will limit kfaka up 
> to 2 g, but the master node to run after a period of time, and submitted to 
> the heap memory and heap memory leak, I free -m looked at it and really have 
> 100 MB of memory, I don't know where memory use, kafka made up 80% of the 
> process of memory, CPU by more than 100%, what reason is this?The 
> configuration parameters have been checked with the official website. The 
> default is not acceptable.
> XmlIpcRegSvc->172.18.58.184:60686 (CLOSE_WAIT) 
> 有很多这个样的端口关闭等待,这是应用连接端。为什么一直等待呢?内存也没有回收 我3台服务是 2核 4G 
> 内存,这之前给的是kafka3G,发现内存没了,报堆外内存溢出,然后我就限制kfaka最大为2G,但主节点跑一段时间后,又报堆内存溢出和堆外内存溢出,我free
>  -m看了一下,内存确实还有100MB了,不知内存用在那里,kafka 这个进程暂用完了 80%的内存,cpu 
> 100%多了,这是什么原因呢?配置参数和官网核对了一下,全用默认的也不行,
> ` 1772 liandong 20 0 6398984 2.146g 16112 S 101.3 58.0 93:59.72 
> /usr/local/jdk1.8/bin/java -Xmx2G -Xms1G -server -XX:+UseG1GC 
> -XX:+HeapDumpOnOutOfMemoryError -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent 
> -Djava.awt.headless=true -XX:MaxDirectMemorySize=512m 
> -Xloggc:/data/kafka/bin/../logs/kafkaSer+...`
> kafka server.log log
> `[2018-08-23 07:56:11,788] INFO [GroupCoordinator 0]: Stabilized group 
> consumer.web.log generation 268 (__consumer_offsets-24) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-08-23 07:56:12,054] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.OutOfMemoryError: Java heap space
> [2018-08-23 07:56:13,846] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.OutOfMemoryError: Java heap space
> [2018-08-23 07:56:15,673] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.OutOfMemoryError: Direct buffer memory
>  at java.nio.Bits.reserveMemory(Bits.java:694)
>  at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
>  at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>  at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
>  at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>  at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>  at 
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104)
>  at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:145)
>  at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
>  at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235)
>  at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196)
>  at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:557)
>  at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:495)
>  at org.apache.kafka.common.network.Selector.poll(Selector.java:424)
>  at kafka.network.Processor.poll(SocketServer.scala:628)
>  at kafka.network.Processor.run(SocketServer.scala:545)
>  at java.lang.Thread.run(Thread.java:748)
> [2018-08-23 07:56:16,379] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.OutOfMemoryError: Java heap space`
> 172.18.58.184:speedtrace (CLOSE_WAIT) 172.18.58.184 是 kafka client connect
> lsof -i | grep java 
> `java 1772 liandong 83u IPv4 7990697 0t0 TCP *:36145 (LISTEN)
> java 1772 liandong 84u IPv4 7990698 0t0 TCP *:9099 (LISTEN)
> java 1772 liandong 85u IPv4 7990701 0t0 TCP *:40745 (LISTEN)
> java 1772 liandong 100u IPv4 7990709 0t0 TCP 
> prod_data_kafka_2:44688->prod_data_zk:eforward (ESTABLISHED)
> java 1772 liandong 193u IPv4 7989816 0t0 TCP prod_data_kafka_2:XmlIpcRegSvc 
> (LISTEN)
> java 1772 liandong 224u IPv4 8019955 0t0 TCP 
> prod_data_kafka_2:9099->172.18.58.184:47430 (ESTABLISHED)
> java 1772 liandong 228u IPv4 8018733 0t0 TCP 
> prod_data_kafka_2:XmlIpcRegSvc->172.18.58.184:33032 (CLOSE_WAIT)
> java 

[jira] [Comment Edited] (KAFKA-10454) Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join partitions don't match

2020-09-01 Thread Levani Kokhreidze (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188319#comment-17188319
 ] 

Levani Kokhreidze edited comment on KAFKA-10454 at 9/1/20, 10:05 AM:
-

I guess, in this scenario, Kafka Streams should do co-partitioning check and 
enforce num of partitions inherited from table source topic?


was (Author: lkokhreidze):
I guess, in this scenario, Kafka Streams should do co-partitioning check and 
should fail early on with some meaningful error message to end user.

> Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join 
> partitions don't match
> -
>
> Key: KAFKA-10454
> URL: https://issues.apache.org/jira/browse/KAFKA-10454
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Levani Kokhreidze
>Priority: Major
>
> Here's integration test: [https://github.com/apache/kafka/pull/9237]
>  
> From the first glance, issue is that when one joins stream to table, and 
> table source topic doesn't have same number of partitions as stream topic, 
> `StateChangelogReader` tries to recover state from changelog (which in this 
> case is the same as source topic) for table from partitions that don't exist. 
> Logs are spammed with: 
>  
> {code:java}
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> 

[jira] [Comment Edited] (KAFKA-10454) Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join partitions don't match

2020-09-01 Thread Levani Kokhreidze (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188334#comment-17188334
 ] 

Levani Kokhreidze edited comment on KAFKA-10454 at 9/1/20, 9:58 AM:


Workaround we have used so far is to force repartitioning with num of 
partitions that equal to table source topic. Based on example from the 
integration test posted above, it would look like this:

 
{code:java}
stream
.selectKey((key, value) -> key)
.repartition(Repartitioned.numberOfPartitions(2))
.join(table, (value1, value2) -> value2)
.to(outputTopic);

{code}


was (Author: lkokhreidze):
Workaround we have used so far is to force repartitioning with num of 
partitions that equal to table source topic. Based on example from the 
integration test posted above, it would look like this:

 
{code:java}
/stream
.selectKey((key, value) -> key)
.repartition(Repartitioned.numberOfPartitions(2))
.join(table, (value1, value2) -> value2)
.to(outputTopic);

{code}

> Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join 
> partitions don't match
> -
>
> Key: KAFKA-10454
> URL: https://issues.apache.org/jira/browse/KAFKA-10454
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Levani Kokhreidze
>Priority: Major
>
> Here's integration test: [https://github.com/apache/kafka/pull/9237]
>  
> From the first glance, issue is that when one joins stream to table, and 
> table source topic doesn't have same number of partitions as stream topic, 
> `StateChangelogReader` tries to recover state from changelog (which in this 
> case is the same as source topic) for table from partitions that don't exist. 
> Logs are spammed with: 
>  
> {code:java}
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 

[jira] [Commented] (KAFKA-10454) Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join partitions don't match

2020-09-01 Thread Levani Kokhreidze (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188334#comment-17188334
 ] 

Levani Kokhreidze commented on KAFKA-10454:
---

Workaround we have used so far is to force repartitioning with num of 
partitions that equal to table source topic. Based on example from the 
integration test posted above, it would look like this:

 
{code:java}
/stream
.selectKey((key, value) -> key)
.repartition(Repartitioned.numberOfPartitions(2))
.join(table, (value1, value2) -> value2)
.to(outputTopic);

{code}

> Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join 
> partitions don't match
> -
>
> Key: KAFKA-10454
> URL: https://issues.apache.org/jira/browse/KAFKA-10454
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Levani Kokhreidze
>Priority: Major
>
> Here's integration test: [https://github.com/apache/kafka/pull/9237]
>  
> From the first glance, issue is that when one joins stream to table, and 
> table source topic doesn't have same number of partitions as stream topic, 
> `StateChangelogReader` tries to recover state from changelog (which in this 
> case is the same as source topic) for table from partitions that don't exist. 
> Logs are spammed with: 
>  
> {code:java}
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> 

[jira] [Assigned] (KAFKA-5892) Connector property override does not work unless setting ALL converter properties

2020-09-01 Thread Mani Jindal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mani Jindal reassigned KAFKA-5892:
--

Assignee: (was: Mani Jindal)

> Connector property override does not work unless setting ALL converter 
> properties
> -
>
> Key: KAFKA-5892
> URL: https://issues.apache.org/jira/browse/KAFKA-5892
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Yeva Byzek
>Priority: Minor
>  Labels: newbie
>
> A single connector setting override {{value.converter.schemas.enable=false}} 
> only takes effect if ALL of the converter properties are overridden in the 
> connector.
> At minimum, we should give user warning or error that this is will be ignored.
> We should also consider changing the behavior to allow the single property 
> override even if all the converter properties are not specified, but this 
> requires discussion to evaluate the impact of this change.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-5892) Connector property override does not work unless setting ALL converter properties

2020-09-01 Thread Mani Jindal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mani Jindal reassigned KAFKA-5892:
--

Assignee: Mani Jindal

> Connector property override does not work unless setting ALL converter 
> properties
> -
>
> Key: KAFKA-5892
> URL: https://issues.apache.org/jira/browse/KAFKA-5892
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Yeva Byzek
>Assignee: Mani Jindal
>Priority: Minor
>  Labels: newbie
>
> A single connector setting override {{value.converter.schemas.enable=false}} 
> only takes effect if ALL of the converter properties are overridden in the 
> connector.
> At minimum, we should give user warning or error that this is will be ignored.
> We should also consider changing the behavior to allow the single property 
> override even if all the converter properties are not specified, but this 
> requires discussion to evaluate the impact of this change.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10454) Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join partitions don't match

2020-09-01 Thread Levani Kokhreidze (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Levani Kokhreidze updated KAFKA-10454:
--
Affects Version/s: 2.6.0

> Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join 
> partitions don't match
> -
>
> Key: KAFKA-10454
> URL: https://issues.apache.org/jira/browse/KAFKA-10454
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Levani Kokhreidze
>Priority: Major
>
> Here's integration test: [https://github.com/apache/kafka/pull/9237]
>  
> From the first glance, issue is that when one joins stream to table, and 
> table source topic doesn't have same number of partitions as stream topic, 
> `StateChangelogReader` tries to recover state from changelog (which in this 
> case is the same as source topic) for table from partitions that don't exist. 
> Logs are spammed with: 
>  
> {code:java}
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> 

[jira] [Updated] (KAFKA-10454) Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join partitions don't match

2020-09-01 Thread Levani Kokhreidze (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Levani Kokhreidze updated KAFKA-10454:
--
Fix Version/s: (was: 2.6.0)

> Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join 
> partitions don't match
> -
>
> Key: KAFKA-10454
> URL: https://issues.apache.org/jira/browse/KAFKA-10454
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Major
>
> Here's integration test: [https://github.com/apache/kafka/pull/9237]
>  
> From the first glance, issue is that when one joins stream to table, and 
> table source topic doesn't have same number of partitions as stream topic, 
> `StateChangelogReader` tries to recover state from changelog (which in this 
> case is the same as source topic) for table from partitions that don't exist. 
> Logs are spammed with: 
>  
> {code:java}
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset 

[jira] [Commented] (KAFKA-10454) Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join partitions don't match

2020-09-01 Thread Levani Kokhreidze (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188319#comment-17188319
 ] 

Levani Kokhreidze commented on KAFKA-10454:
---

I guess, in this scenario, Kafka Streams should do co-partitioning check and 
should fail early on with some meaningful error message to end user.

> Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join 
> partitions don't match
> -
>
> Key: KAFKA-10454
> URL: https://issues.apache.org/jira/browse/KAFKA-10454
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Major
> Fix For: 2.6.0
>
>
> Here's integration test: [https://github.com/apache/kafka/pull/9237]
>  
> From the first glance, issue is that when one joins stream to table, and 
> table source topic doesn't have same number of partitions as stream topic, 
> `StateChangelogReader` tries to recover state from changelog (which in this 
> case is the same as source topic) for table from partitions that don't exist. 
> Logs are spammed with: 
>  
> {code:java}
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> 

[jira] [Comment Edited] (KAFKA-10454) Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join partitions don't match

2020-09-01 Thread Levani Kokhreidze (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188313#comment-17188313
 ] 

Levani Kokhreidze edited comment on KAFKA-10454 at 9/1/20, 9:43 AM:


CC [~mjsax] [~vvcephei] [~guozhang] 

I may try to fix this as soon as I can - but if by any chance somebody wants to 
pick this up before me, feel free to do so.


was (Author: lkokhreidze):
CC [~mjsax] [~vvcephei] [~guozhang] 

I may try to fix this as soon as I can - but if any chance somebody wants to 
pick it up before me, feel free to do so.

> Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join 
> partitions don't match
> -
>
> Key: KAFKA-10454
> URL: https://issues.apache.org/jira/browse/KAFKA-10454
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Major
> Fix For: 2.6.0
>
>
> Here's integration test: [https://github.com/apache/kafka/pull/9237]
>  
> From the first glance, issue is that when one joins stream to table, and 
> table source topic doesn't have same number of partitions as stream topic, 
> `StateChangelogReader` tries to recover state from changelog (which in this 
> case is the same as source topic) for table from partitions that don't exist. 
> Logs are spammed with: 
>  
> {code:java}
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> 

[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-01 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r480997443



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -203,6 +203,10 @@ public String queryableStoreName() {
 processorSupplier,
 tableNode,
 builder);
+
+kTable.enableSendingOldValues();

Review comment:
   If we call `this.enableSendingOldValues()` then `enableSendingOldValues` 
is not called on the `KTableFilter` instance. Whereas calling 
`kTable.enableSendingOldValues()` does call 
`KTableFilter.enableSendingOldValues()`.
   
   The call to `KTableFilter.enableSendingOldValues()` is needed to ensure the 
filter expects the old values.
   
   Is it more correct to call `enableSOV` on both the filter and `this` 
explicitly, so that the downstream `kTable` doesn't have it set?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10454) Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join partitions don't match

2020-09-01 Thread Levani Kokhreidze (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188313#comment-17188313
 ] 

Levani Kokhreidze commented on KAFKA-10454:
---

CC [~mjsax] [~vvcephei] [~guozhang] 

I may try to fix this as soon as I can - but if any chance somebody wants to 
pick it up before me, feel free to do so.

> Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join 
> partitions don't match
> -
>
> Key: KAFKA-10454
> URL: https://issues.apache.org/jira/browse/KAFKA-10454
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Major
> Fix For: 2.6.0
>
>
> Here's integration test: [https://github.com/apache/kafka/pull/9237]
>  
> From the first glance, issue is that when one joins stream to table, and 
> table source topic doesn't have same number of partitions as stream topic, 
> `StateChangelogReader` tries to recover state from changelog (which in this 
> case is the same as source topic) for table from partitions that don't exist. 
> Logs are spammed with: 
>  
> {code:java}
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> 

[jira] [Updated] (KAFKA-10454) Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join partitions don't match

2020-09-01 Thread Levani Kokhreidze (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Levani Kokhreidze updated KAFKA-10454:
--
Description: 
Here's integration test: [https://github.com/apache/kafka/pull/9237]

 

>From the first glance, issue is that when one joins stream to table, and table 
>source topic doesn't have same number of partitions as stream topic, 
>`StateChangelogReader` tries to recover state from changelog (which in this 
>case is the same as source topic) for table from partitions that don't exist. 
>Logs are spammed with: 

 
{code:java}
[2020-09-01 12:33:07,508] INFO stream-thread 
[app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
 End offset for changelog 
topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; will 
retry in the next time. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
[2020-09-01 12:33:07,508] INFO stream-thread 
[app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
 End offset for changelog 
topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; will 
retry in the next time. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
[2020-09-01 12:33:07,508] INFO stream-thread 
[app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
 End offset for changelog 
topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; will 
retry in the next time. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
[2020-09-01 12:33:07,510] INFO stream-thread 
[app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
 End offset for changelog 
topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; will 
retry in the next time. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
[2020-09-01 12:33:07,510] INFO stream-thread 
[app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
 End offset for changelog 
topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; will 
retry in the next time. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
[2020-09-01 12:33:07,510] INFO stream-thread 
[app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
 End offset for changelog 
topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; will 
retry in the next time. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
[2020-09-01 12:33:07,510] INFO stream-thread 
[app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
 End offset for changelog 
topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; will 
retry in the next time. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
[2020-09-01 12:33:07,513] INFO stream-thread 
[app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
 End offset for changelog 
topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; will 
retry in the next time. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
[2020-09-01 12:33:07,513] INFO stream-thread 
[app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
 End offset for changelog 
topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; will 
retry in the next time. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
[2020-09-01 12:33:07,513] INFO stream-thread 
[app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
 End offset for changelog 
topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; will 
retry in the next time. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
[2020-09-01 12:33:07,513] INFO stream-thread 
[app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
 End offset for changelog 
topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; will 
retry in the next time. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
[2020-09-01 12:33:07,515] INFO stream-thread 
[app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
 End offset for changelog 
topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; will 
retry in the next time. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
[2020-09-01 12:33:07,515] INFO stream-thread 

[GitHub] [kafka] lkokhreidze opened a new pull request #9237: KAFKA-10454 / integration test

2020-09-01 Thread GitBox


lkokhreidze opened a new pull request #9237:
URL: https://github.com/apache/kafka/pull/9237


   Integration test for 
[KAFKA-10454](https://github.com/lkokhreidze/kafka/pull/new/KAFKA-10454) bug
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10454) Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join partitions don't match

2020-09-01 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-10454:
-

 Summary: Kafka Streams Stuck in infinite REBALANCING loop when 
stream <> table join partitions don't match
 Key: KAFKA-10454
 URL: https://issues.apache.org/jira/browse/KAFKA-10454
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Levani Kokhreidze
 Fix For: 2.6.0


TBD



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-01 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r480998514



##
File path: 
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##
@@ -123,7 +123,7 @@ public void shouldAllowJoinMaterializedFilteredKTable() {
 
 assertThat(
 topology.stateStores().size(),
-equalTo(1));
+equalTo(2));

Review comment:
   Sounds good.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-01 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r480997443



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -203,6 +203,10 @@ public String queryableStoreName() {
 processorSupplier,
 tableNode,
 builder);
+
+kTable.enableSendingOldValues();

Review comment:
   If we call `this.enableSendingOldValues()` then `enableSendingOldValues` 
is not called on the `KTableFilter` instance. Whereas calling 
`kTable.enableSendingOldValues()` does call 
`KTableFilter.enableSendingOldValues()`.
   
   The call to `KTableFilter.enableSendingOldValues()` is needed to ensure the 
filter expects the old values.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-3190) KafkaProducer should not invoke callback in send()

2020-09-01 Thread Aakash Gupta (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188279#comment-17188279
 ] 

Aakash Gupta commented on KAFKA-3190:
-

[~omkreddy] If no one is working on this, I can take this up. I've added my 
comment on this Jira bug as well: KAFKA-2200

> KafkaProducer should not invoke callback in send()
> --
>
> Key: KAFKA-3190
> URL: https://issues.apache.org/jira/browse/KAFKA-3190
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Critical
>
> Currently KafkaProducer will invoke callback.onComplete() if it receives an 
> ApiException during send(). This breaks the guarantee that callback will be 
> invoked in order. It seems ApiException in send() only comes from metadata 
> refresh. If so, we can probably simply throw it instead of invoking 
> callback().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10072) Kafkaconsumer is configured with different clientid parameters to obtain different results

2020-09-01 Thread Ankit Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188254#comment-17188254
 ] 

Ankit Kumar commented on KAFKA-10072:
-

Hey [~qq619618919],

I was trying to replicate the issue, but unable to find any anomaly in the 
Consumer behavior. Can you please share the steps to replicate the issue?

 

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 
test_topic --consumer-property client.id=aa --from-beginning
SLF4J: Class path contains multiple SLF4J bindings.
...
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Test Msg 1
Test Msg 2

--

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 
test_topic --consumer-property clientid=bb --from-beginning
SLF4J: Class path contains multiple SLF4J bindings.
...
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
[2020-09-01 13:48:13,759] WARN The configuration 'clientid' was supplied but 
isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
Test Msg 1
Test Msg 2

> Kafkaconsumer is configured with different clientid parameters to obtain 
> different results
> --
>
> Key: KAFKA-10072
> URL: https://issues.apache.org/jira/browse/KAFKA-10072
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.0
> Environment: centos7.6 8C 32G
>Reporter: victor
>Assignee: Ankit Kumar
>Priority: Blocker
>
> kafka-console-consumer.sh --bootstrap-server host1:port --consumer-property 
> {color:#DE350B}client.id=aa{color} --from-beginning --topic topicA
> {color:#DE350B}There's no data
> {color}
> kafka-console-consumer.sh --bootstrap-server host1:port --consumer-property 
> {color:#DE350B}clientid=bb{color} --from-beginning --topic topicA
> {color:#DE350B}Successfully consume data{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread:  (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There

2020-09-01 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188235#comment-17188235
 ] 

Luke Chen edited comment on KAFKA-10450 at 9/1/20, 7:54 AM:


I think this behavior is expected. The *address already in use* error did 
display, in *kafka-server-start.sh*. The console-producer.sh (client) is 
expected that kafka server is up and just send messages to kafka server 
directly. It will throw exception when the server is not responding, or other 
exception when the server working differently (in your case, it's the 
unexpected response from sonarqube H2 DB).

 
{code:java}
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server 
localhost:9092
...

[2020-09-01 15:47:58,404] ERROR [KafkaServer id=0] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: Socket server failed to bind to 
0.0.0.0:9092: Address already in use.
at kafka.network.Acceptor.openServerSocket(SocketServer.scala:673)
at kafka.network.Acceptor.(SocketServer.scala:541)
at kafka.network.SocketServer.createAcceptor(SocketServer.scala:278)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:250)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:248)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:248)
at kafka.network.SocketServer.startup(SocketServer.scala:122)
at kafka.server.KafkaServer.startup(KafkaServer.scala:299)
at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
Caused by: java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
at kafka.network.Acceptor.openServerSocket(SocketServer.scala:669)
... 13 more

{code}


was (Author: showuon):
I think this behavior is expected. The *address already in use* error did 
display, in *kafka-server-start.sh*. The console-producer.sh (client) is 
expected that kafka server is up and just send messages to kafka server 
directly. It will throw exception when the server is not responding, but not 
the server working differently (in your case, it's the unexpected response from 
sonarqube H2 DB).

 
{code:java}
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server 
localhost:9092
...

[2020-09-01 15:47:58,404] ERROR [KafkaServer id=0] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: Socket server failed to bind to 
0.0.0.0:9092: Address already in use.
at kafka.network.Acceptor.openServerSocket(SocketServer.scala:673)
at kafka.network.Acceptor.(SocketServer.scala:541)
at kafka.network.SocketServer.createAcceptor(SocketServer.scala:278)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:250)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:248)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:248)
at kafka.network.SocketServer.startup(SocketServer.scala:122)
at kafka.server.KafkaServer.startup(KafkaServer.scala:299)
at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
Caused by: java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
at 

[jira] [Commented] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread:  (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are n

2020-09-01 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188235#comment-17188235
 ] 

Luke Chen commented on KAFKA-10450:
---

I think this behavior is expected. The *address already in use* error did 
display, in *kafka-server-start.sh*. The console-producer.sh (client) is 
expected that kafka server is up and just send messages to kafka server 
directly. It will throw exception when the server is not responding, but not 
the server working differently (in your case, it's the unexpected response from 
sonarqube H2 DB).

 
{code:java}
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server 
localhost:9092
...

[2020-09-01 15:47:58,404] ERROR [KafkaServer id=0] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: Socket server failed to bind to 
0.0.0.0:9092: Address already in use.
at kafka.network.Acceptor.openServerSocket(SocketServer.scala:673)
at kafka.network.Acceptor.(SocketServer.scala:541)
at kafka.network.SocketServer.createAcceptor(SocketServer.scala:278)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:250)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:248)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:248)
at kafka.network.SocketServer.startup(SocketServer.scala:122)
at kafka.server.KafkaServer.startup(KafkaServer.scala:299)
at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
Caused by: java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
at kafka.network.Acceptor.openServerSocket(SocketServer.scala:669)
... 13 more

{code}

> console-producer throws Uncaught error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender) 
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> ---
>
> Key: KAFKA-10450
> URL: https://issues.apache.org/jira/browse/KAFKA-10450
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0
> Environment: Kafka Version 2.6.0
> MacOS Version - macOS Catalina 10.15.6 (19G2021)
> java version "11.0.8" 2020-07-14 LTS
> Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
> Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
>Reporter: Jigar Naik
>Priority: Minor
>
> Kafka-console-producer.sh gives below error on Mac 
> ERROR [Producer clientId=console-producer] Uncaught error in kafka producer 
> I/O thread:  (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> *Steps to re-produce the issue.* 
> Download Kafka from 
> [kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz]
>  
> Change data and log directory (Optional)
> Create Topic Using below command 
>  
> {code:java}
> ./kafka-topics.sh \
>  --create \
>  --zookeeper localhost:2181 \
>  --replication-factor 1 \
>  --partitions 1 \
>  --topic my-topic{code}
>  
> Start Kafka console producer using below command
>  
> {code:java}
> ./kafka-console-consumer.sh \
>  --topic my-topic \
>  --from-beginning \
>  --bootstrap-server localhost:9092{code}
>  
> Gives below output
>  
> {code:java}
> ./kafka-console-producer.sh \
>  --topic my-topic \
>      --bootstrap-server 127.0.0.1:9092
> >[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] 
> >Uncaught error in kafka producer I/O thread:  
> >(org.apache.kafka.clients.producer.internals.Sender)
> java.nio.BufferUnderflowException
> at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650)
> at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391)
> at 
> 

[GitHub] [kafka] cnZach opened a new pull request #9236: MINOR: Log warn message with details when there's kerberos login issue

2020-09-01 Thread GitBox


cnZach opened a new pull request #9236:
URL: https://github.com/apache/kafka/pull/9236


   …though we will still retry
   
   Currently, we just capture the exception and retry later, we can't figure 
out what happened at runtime, it would be helpful to log a warn message with 
details.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   Minor logging change, so no extra test is required. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607

2020-09-01 Thread GitBox


cadonna commented on a change in pull request #9232:
URL: https://github.com/apache/kafka/pull/9232#discussion_r480927013



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##
@@ -174,22 +241,163 @@ private void initSensors(final StreamsMetricsImpl 
streamsMetrics, final RocksDBM
 numberOfFileErrorsSensor = 
RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricContext);
 }
 
-private void initGauges(final StreamsMetricsImpl streamsMetrics, final 
RocksDBMetricContext metricContext) {
-RocksDBMetrics.addNumEntriesActiveMemTableMetric(streamsMetrics, 
metricContext, (metricsConfig, now) -> {
+private void initGauges(final StreamsMetricsImpl streamsMetrics,
+final RocksDBMetricContext metricContext) {
+RocksDBMetrics.addNumImmutableMemTableMetric(
+streamsMetrics,
+metricContext,
+gaugeToComputeSumOfProperties(NUMBER_OF_IMMUTABLE_MEMTABLES)
+);
+RocksDBMetrics.addCurSizeActiveMemTable(
+streamsMetrics,
+metricContext,
+gaugeToComputeSumOfProperties(CURRENT_SIZE_OF_ACTIVE_MEMTABLE)
+);
+RocksDBMetrics.addCurSizeAllMemTables(
+streamsMetrics,
+metricContext,
+gaugeToComputeSumOfProperties(CURRENT_SIZE_OF_ALL_MEMTABLES)
+);
+RocksDBMetrics.addSizeAllMemTables(
+streamsMetrics,
+metricContext,
+gaugeToComputeSumOfProperties(SIZE_OF_ALL_MEMTABLES)
+);
+RocksDBMetrics.addNumEntriesActiveMemTableMetric(
+streamsMetrics,
+metricContext,
+gaugeToComputeSumOfProperties(NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE)
+);
+RocksDBMetrics.addNumDeletesActiveMemTableMetric(
+streamsMetrics,
+metricContext,
+gaugeToComputeSumOfProperties(NUMBER_OF_DELETES_ACTIVE_MEMTABLE)
+);
+RocksDBMetrics.addNumEntriesImmMemTablesMetric(
+streamsMetrics,
+metricContext,
+
gaugeToComputeSumOfProperties(NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES)
+);
+RocksDBMetrics.addNumDeletesImmMemTablesMetric(
+streamsMetrics,
+metricContext,
+
gaugeToComputeSumOfProperties(NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES)
+);
+RocksDBMetrics.addMemTableFlushPending(
+streamsMetrics,
+metricContext,
+gaugeToComputeSumOfProperties(MEMTABLE_FLUSH_PENDING)
+);
+RocksDBMetrics.addNumRunningFlushesMetric(
+streamsMetrics,
+metricContext,
+gaugeToComputeSumOfProperties(NUMBER_OF_RUNNING_FLUSHES)
+);
+RocksDBMetrics.addCompactionPendingMetric(
+streamsMetrics,
+metricContext,
+gaugeToComputeSumOfProperties(COMPACTION_PENDING)
+);
+RocksDBMetrics.addNumRunningCompactionsMetric(
+streamsMetrics,
+metricContext,
+gaugeToComputeSumOfProperties(NUMBER_OF_RUNNING_COMPACTIONS)
+);
+RocksDBMetrics.addEstimatePendingCompactionBytesMetric(
+streamsMetrics,
+metricContext,
+
gaugeToComputeSumOfProperties(ESTIMATED_BYTES_OF_PENDING_COMPACTION)
+);
+RocksDBMetrics.addTotalSstFilesSizeMetric(
+streamsMetrics,
+metricContext,
+gaugeToComputeSumOfProperties(TOTAL_SST_FILES_SIZE)
+);
+RocksDBMetrics.addLiveSstFilesSizeMetric(
+streamsMetrics,
+metricContext,
+gaugeToComputeSumOfProperties(LIVE_SST_FILES_SIZE)
+);
+RocksDBMetrics.addNumLiveVersionMetric(
+streamsMetrics,
+metricContext,
+gaugeToComputeSumOfProperties(NUMBER_OF_LIVE_VERSIONS)
+);
+RocksDBMetrics.addEstimateNumKeysMetric(
+streamsMetrics,
+metricContext,
+gaugeToComputeSumOfProperties(ESTIMATED_NUMBER_OF_KEYS)
+);
+RocksDBMetrics.addEstimateTableReadersMemMetric(
+streamsMetrics,
+metricContext,
+gaugeToComputeSumOfProperties(ESTIMATED_MEMORY_OF_TABLE_READERS)
+);
+RocksDBMetrics.addBackgroundErrorsMetric(
+streamsMetrics,
+metricContext,
+gaugeToComputeSumOfProperties(NUMBER_OF_BACKGROUND_ERRORS)
+);
+RocksDBMetrics.addBlockCacheCapacityMetric(
+streamsMetrics,
+metricContext,
+gaugeToComputeBlockCacheMetrics(CAPACITY_OF_BLOCK_CACHE)
+);
+RocksDBMetrics.addBlockCacheUsageMetric(
+streamsMetrics,
+metricContext,
+gaugeToComputeBlockCacheMetrics(USAGE_OF_BLOCK_CACHE)
+);
+

[GitHub] [kafka] showuon commented on pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-09-01 Thread GitBox


showuon commented on pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#issuecomment-684518374


   @mjsax , I didn't see the Jenkins build results. Should we run again? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining

2020-09-01 Thread GitBox


showuon commented on pull request #9062:
URL: https://github.com/apache/kafka/pull/9062#issuecomment-684518643


   @omkreddy , I didn't see the Jenkins build results. Should we run again? 
Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607

2020-09-01 Thread GitBox


cadonna commented on a change in pull request #9232:
URL: https://github.com/apache/kafka/pull/9232#discussion_r480924297



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##
@@ -150,14 +176,55 @@ private void verifyStatistics(final String segmentName, 
final Statistics statist
 statistics != null &&
 
storeToValueProviders.values().stream().anyMatch(valueProviders -> 
valueProviders.statistics == null))) {
 
-throw new IllegalStateException("Statistics for store \"" + 
segmentName + "\" of task " + taskId +
-" is" + (statistics == null ? " " : " not ") + "null although 
the statistics of another store in this " +
+throw new IllegalStateException("Statistics for segment " + 
segmentName + " of task " + taskId +
+" is" + (statistics == null ? " " : " not ") + "null although 
the statistics of another segment in this " +
 "metrics recorder is" + (statistics != null ? " " : " not ") + 
"null. " +
 "This is a bug in Kafka Streams. " +
 "Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues;);
 }
 }
 
+private void verifyDbAndCacheAndStatistics(final String segmentName,
+   final RocksDB db,
+   final Cache cache,
+   final Statistics statistics) {
+for (final DbAndCacheAndStatistics valueProviders : 
storeToValueProviders.values()) {
+verifyIfSomeAreNull(segmentName, statistics, 
valueProviders.statistics, "statistics");
+verifyIfSomeAreNull(segmentName, cache, valueProviders.cache, 
"cache");
+if (db == valueProviders.db) {
+throw new IllegalStateException("DB instance for store " + 
segmentName + " of task " + taskId +
+" was already added for another segment as a value 
provider. This is a bug in Kafka Streams. " +
+"Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues;);
+}
+if (storeToValueProviders.size() == 1 && cache != 
valueProviders.cache) {
+singleCache = false;
+} else if (singleCache && cache != valueProviders.cache || 
!singleCache && cache == valueProviders.cache) {
+throw new IllegalStateException("Caches for store " + 
storeName + " of task " + taskId +
+" are either not all distinct or do not all refer to the 
same cache. This is a bug in Kafka Streams. " +
+"Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues;);
+}
+}
+}
+
+private void verifyIfSomeAreNull(final String segmentName,

Review comment:
   I will rename it to `verifyConsistencyOfValueProvidersAcrossSegments()`. 
WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

2020-09-01 Thread GitBox


showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-684517819


   @ijuma , could you help review this PR again? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607

2020-09-01 Thread GitBox


cadonna commented on a change in pull request #9232:
URL: https://github.com/apache/kafka/pull/9232#discussion_r480914782



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##
@@ -150,14 +176,55 @@ private void verifyStatistics(final String segmentName, 
final Statistics statist
 statistics != null &&
 
storeToValueProviders.values().stream().anyMatch(valueProviders -> 
valueProviders.statistics == null))) {
 
-throw new IllegalStateException("Statistics for store \"" + 
segmentName + "\" of task " + taskId +
-" is" + (statistics == null ? " " : " not ") + "null although 
the statistics of another store in this " +
+throw new IllegalStateException("Statistics for segment " + 
segmentName + " of task " + taskId +
+" is" + (statistics == null ? " " : " not ") + "null although 
the statistics of another segment in this " +
 "metrics recorder is" + (statistics != null ? " " : " not ") + 
"null. " +
 "This is a bug in Kafka Streams. " +
 "Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues;);
 }
 }
 
+private void verifyDbAndCacheAndStatistics(final String segmentName,
+   final RocksDB db,
+   final Cache cache,
+   final Statistics statistics) {
+for (final DbAndCacheAndStatistics valueProviders : 
storeToValueProviders.values()) {
+verifyIfSomeAreNull(segmentName, statistics, 
valueProviders.statistics, "statistics");
+verifyIfSomeAreNull(segmentName, cache, valueProviders.cache, 
"cache");
+if (db == valueProviders.db) {
+throw new IllegalStateException("DB instance for store " + 
segmentName + " of task " + taskId +
+" was already added for another segment as a value 
provider. This is a bug in Kafka Streams. " +
+"Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues;);
+}
+if (storeToValueProviders.size() == 1 && cache != 
valueProviders.cache) {

Review comment:
   When RocksDB uses the memory bounded configuration, all RocksDB 
instances use the same cache, i.e., the reference to the same cache. 
   At this point in the code the value providers for the given segment with the 
cache `cache` has not been added yet to the value providers used in this 
recorder. To understand if different caches are used for different segments 
(i.e., `singleCache = false`), it is not enough to just check if only one 
single cache has been already added, we also need to check if the already added 
cache (i.e., `valueProviders.cache`) is different from the cache to add 
(`cache`).   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607

2020-09-01 Thread GitBox


cadonna commented on a change in pull request #9232:
URL: https://github.com/apache/kafka/pull/9232#discussion_r480914782



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##
@@ -150,14 +176,55 @@ private void verifyStatistics(final String segmentName, 
final Statistics statist
 statistics != null &&
 
storeToValueProviders.values().stream().anyMatch(valueProviders -> 
valueProviders.statistics == null))) {
 
-throw new IllegalStateException("Statistics for store \"" + 
segmentName + "\" of task " + taskId +
-" is" + (statistics == null ? " " : " not ") + "null although 
the statistics of another store in this " +
+throw new IllegalStateException("Statistics for segment " + 
segmentName + " of task " + taskId +
+" is" + (statistics == null ? " " : " not ") + "null although 
the statistics of another segment in this " +
 "metrics recorder is" + (statistics != null ? " " : " not ") + 
"null. " +
 "This is a bug in Kafka Streams. " +
 "Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues;);
 }
 }
 
+private void verifyDbAndCacheAndStatistics(final String segmentName,
+   final RocksDB db,
+   final Cache cache,
+   final Statistics statistics) {
+for (final DbAndCacheAndStatistics valueProviders : 
storeToValueProviders.values()) {
+verifyIfSomeAreNull(segmentName, statistics, 
valueProviders.statistics, "statistics");
+verifyIfSomeAreNull(segmentName, cache, valueProviders.cache, 
"cache");
+if (db == valueProviders.db) {
+throw new IllegalStateException("DB instance for store " + 
segmentName + " of task " + taskId +
+" was already added for another segment as a value 
provider. This is a bug in Kafka Streams. " +
+"Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues;);
+}
+if (storeToValueProviders.size() == 1 && cache != 
valueProviders.cache) {

Review comment:
   If RocksDB uses the memory bounded configuration, all RocksDB instances 
use the same cache, i.e., the reference to the same cache. 
   At this point in the code the value providers for the given segment with the 
cache `cache` has not been added yet to the value providers used in this 
recorder. To understand if different caches are used for different segments 
(i.e., `singleCache = false`), it is not enough to just check if only one 
single cache has been already added, we also need to check if the already added 
cache (i.e., `valueProviders.cache`) is different from the cache to add 
(`cache`).   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org