[GitHub] [kafka] ivanyu commented on a change in pull request #7561: [WIP] KAFKA-7739: Tiered storage

2020-08-25 Thread GitBox


ivanyu commented on a change in pull request #7561:
URL: https://github.com/apache/kafka/pull/7561#discussion_r477044228



##
File path: 
remote-storage-managers/s3/src/main/java/org/apache/kafka/rsm/s3/S3RemoteStorageManagerConfig.java
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.rsm.s3;
+
+import java.util.Map;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Regions;
+
+/**
+ * A configuration for {@link S3RemoteStorageManager}.
+ */
+public class S3RemoteStorageManagerConfig extends AbstractConfig {
+public static final String S3_BUCKET_NAME_CONFIG = "s3.bucket.name";

Review comment:
   Maybe it's worth doing 
`config.originalsWithPrefix(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX)` instead of 
adding the prefix manually to all configurations. I'll look into this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JoelWee commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-08-25 Thread GitBox


JoelWee commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r477019086



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
-// so ignore unless it is a left join
+// we do join iff the joining keys are equal, thus, if the mappedKey 
is null we cannot join
+// and just ignore the record.
 //
 // we also ignore the record if value is null, because in a key-value 
data model a null-value indicates
 // an empty message (ie, there is nothing to be joined) -- this 
contrast SQL NULL semantics
 // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
 // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
-if (key == null || value == null) {
+final Optional maybeMappedKey = maybeExtractMappedKey(key, value);
+if (!maybeMappedKey.isPresent()) {
 LOG.warn(
 "Skipping record due to null key or value. key=[{}] value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
 key, value, context().topic(), context().partition(), 
context().offset()
 );
 droppedRecordsSensor.record();
 } else {
-final K2 mappedKey = keyMapper.apply(key, value);
-final V2 value2 = mappedKey == null ? null : 
getValueOrNull(valueGetter.get(mappedKey));
+final K2 mappedKey = maybeMappedKey.get();
+final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
 if (leftJoin || value2 != null) {
 context().forward(key, joiner.apply(value, value2));
 }
 }
 }
 
+private Optional maybeExtractMappedKey(final K1 key, final V1 value) {
+if (value == null) {
+return Optional.empty();

Review comment:
   Yep agreed, was originally worried about NPEs arising from the null 
value as well. Done now :) 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
-// so ignore unless it is a left join
+// we do join iff the joining keys are equal, thus, if the mappedKey 
is null we cannot join
+// and just ignore the record.
 //
 // we also ignore the record if value is null, because in a key-value 
data model a null-value indicates
 // an empty message (ie, there is nothing to be joined) -- this 
contrast SQL NULL semantics
 // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
 // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
-if (key == null || value == null) {
+final Optional maybeMappedKey = maybeExtractMappedKey(key, value);
+if (!maybeMappedKey.isPresent()) {
 LOG.warn(
 "Skipping record due to null key or value. key=[{}] value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
 key, value, context().topic(), context().partition(), 
context().offset()
 );
 droppedRecordsSensor.record();
 } else {
-final K2 mappedKey = keyMapper.apply(key, value);
-final V2 value2 = mappedKey == null ? null : 
getValueOrNull(valueGetter.get(mappedKey));
+final K2 mappedKey = maybeMappedKey.get();
+final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
 if (leftJoin || value2 != null) {
 context().forward(key, joiner.apply(value, value2));
 }
 }
 }
 
+private Optional maybeExtractMappedKey(final K1 key, final V1 value) {
+if (value == null) {
+return Optional.empty();

Review comment:
   Yep agreed – was originally worried about NPEs arising from the null 
value as well. Done now :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:

[jira] [Updated] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2

2020-08-25 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10304:
---
Component/s: mirrormaker

> Revisit and improve the tests of MirrorMaker 2
> --
>
> Key: KAFKA-10304
> URL: https://issues.apache.org/jira/browse/KAFKA-10304
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, mirrormaker
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Minor
>
> due to the quick development of Kafka MM 2, unit and integration tests of 
> MirrorMaker 2 were made just for covering each individual feature and some of 
> them are simply copy-n-paste from the existing tests with small tweaks. It 
> may be a good time to revisit and improve the tests, possibly in the 
> following way:
> (1) are 100 messages good enough for integration tests?
> (2) what about the failure in the middle of integration tests?
> (3) Do we want to check other messages (e.g. checkpoint, heartbeat, offset 
> sync..) beyond the mirrored message in integration tests?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-25 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10339:
---
Component/s: mirrormaker

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-08-25 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184891#comment-17184891
 ] 

Ning Zhang edited comment on KAFKA-10424 at 8/26/20, 3:13 AM:
--

[~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a 
bug:

before running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 Configs:
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

After running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 
Configs:*cleanup.policy=compact*
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

note that `cleanup.policy=compact` is added after MM2 start


was (Author: yangguo1220):
[~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a 
bug:

before running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 Configs:
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

After running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 
Configs:cleanup.policy=compact
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

note that `cleanup.policy=compact` is added after MM2 start

> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Assignee: Ning Zhang
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-08-25 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184891#comment-17184891
 ] 

Ning Zhang edited comment on KAFKA-10424 at 8/26/20, 3:13 AM:
--

[~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a 
bug:

before running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 Configs:
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

After running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 
Configs:cleanup.policy=compact
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

note that `cleanup.policy=compact` is added after MM2 start


was (Author: yangguo1220):
[~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a 
bug:

before running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 Configs:
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

After running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 
Configs:*cleanup.policy=compact*
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

note that `cleanup.policy=compact` is added after MM2 start

> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Assignee: Ning Zhang
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-08-25 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184891#comment-17184891
 ] 

Ning Zhang edited comment on KAFKA-10424 at 8/26/20, 3:12 AM:
--

[~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a 
bug:

before running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 Configs:
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

After running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 
Configs:cleanup.policy=compact
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

note that `cleanup.policy=compact` is added after MM2 start


was (Author: yangguo1220):
[~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a 
bug:

before running MM2:
```
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 Configs:
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
```
After running MM2:
```
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 
Configs:cleanup.policy=compact
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
```

note that `cleanup.policy=compact` is added after MM2 start

> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Assignee: Ning Zhang
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-08-25 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184891#comment-17184891
 ] 

Ning Zhang commented on KAFKA-10424:


[~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a 
bug:

before running MM2:
```
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 Configs:
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
```
After running MM2:
```
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 
Configs:cleanup.policy=compact
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
```

note that `cleanup.policy=compact` is added after MM2 start

> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Assignee: Ning Zhang
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #9222: KAFKA-10437: Implement test-utils and StateStore changes for KIP-478

2020-08-25 Thread GitBox


vvcephei commented on pull request #9222:
URL: https://github.com/apache/kafka/pull/9222#issuecomment-680438318


   Hey @abbccdda , this is a Part 4 PR that I extracted out from Part 3 (#9221 
) when it became too large. If you have a chance, I'd appreciate your review. 
Especially wrt the parts that require KIP amendments.
   
   Thanks as always for your reviews!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9344) Logged consumer config does not always match actual config values

2020-08-25 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-9344.
---
Resolution: Fixed

> Logged consumer config does not always match actual config values
> -
>
> Key: KAFKA-9344
> URL: https://issues.apache.org/jira/browse/KAFKA-9344
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.4.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Major
>
> Similar to KAFKA-8928, during consumer construction, some configs might be 
> overridden (client.id for instance), but the actual values will not be 
> reflected in the info log. It'd better display the overridden values for 
> those configs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #9222: KAFKA-10437: Implement test-utils and StateStore changes for KIP-478

2020-08-25 Thread GitBox


vvcephei commented on a change in pull request #9222:
URL: https://github.com/apache/kafka/pull/9222#discussion_r476988013



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##
@@ -61,7 +84,14 @@
  * @throws IllegalStateException If store gets registered after 
initialized is already finished
  * @throws StreamsException if the store's change log does not contain the 
partition
  */
-void init(ProcessorContext context, StateStore root);
+default void init(final ProcessorContext context, final StateStore 
root) {
+final org.apache.kafka.streams.processor.ProcessorContext adapted =
+ProcessorContextReverseAdapter.adapt(
+context,
+new 
ProcessorContextReverseAdapter.UnsupportedDeprecatedForwarder()
+);
+init(adapted, root);
+}

Review comment:
   We have to add this so that we can pass in the new ProcessorContext. The 
default implementation delegates to the old `init` method so that existing 
store implementations will function with no changes.
   
   If the only callers were internal, we could just adapt at the call site. 
Unfortunately, users can also call `StateStore#init`, and they would do it if 
they have their own store implementations or if they use `MockProcessorContext` 
to test a stateful processor.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java
##
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+public final class InternalProcessorContextReverseAdapter implements 
InternalProcessorContext {

Review comment:
   I just renamed this from `ProcessorContextReverseAdapter` (which 
confusingly implemented `InternalProcessorContext`), so that I could add an 
adapter specifically for non-Internal `ProcessorContext`s.

##
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
##
@@ -36,7 +36,7 @@
 public class WordCountProcessorTest {
 @Test
 public void test() {
-final MockProcessorContext context = new MockProcessorContext();
+final MockProcessorContext context = new 
MockProcessorContext<>();

Review comment:
   Switching over to the mock of the new API.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##
@@ -45,6 +47,27 @@
  */
 String name();
 
+/**
+ * Initializes this state store.
+ * 
+ * The implementation of this function must register the root store in the 
context via the
+ * {@link 
org.apache.kafka.streams.processor.ProcessorContext#register(StateStore, 
StateRestoreCallback)} function,
+ * where the first {@link StateStore} parameter should always be the 
passed-in {@code root} object, and
+ * the second parameter should be an object of user's implementation
+ * of the {@link StateRestoreCallback} interface used for restoring the 
state store from the changelog.
+ * 
+ * Note that if the state store engine itself supports bulk writes, users 
can implement another
+ * interface {@link BatchingStateRestoreCallback} which extends {@link 
StateRestoreCallback} to
+ * let users implement bulk-load restoration 

[GitHub] [kafka] vvcephei opened a new pull request #9222: KAFKA-10437: Implement test-utils and StateStore changes for KIP-478

2020-08-25 Thread GitBox


vvcephei opened a new pull request #9222:
URL: https://github.com/apache/kafka/pull/9222


   Propose a new init method for StateStore so that it works with the new 
ProcessorContext.
   Convert the test-utils MockProcessorContext to the new API.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10437) Convert test-utils (and StateStore) for KIP-478

2020-08-25 Thread John Roesler (Jira)
John Roesler created KAFKA-10437:


 Summary: Convert test-utils (and StateStore) for KIP-478
 Key: KAFKA-10437
 URL: https://issues.apache.org/jira/browse/KAFKA-10437
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

2020-08-25 Thread GitBox


vvcephei commented on a change in pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#discussion_r476953793



##
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
##
@@ -145,24 +145,24 @@ public void shouldPunctuateIfWallClockTimeAdvances() {
 assertThat(outputTopic.isEmpty(), is(true));
 }
 
-public static class CustomMaxAggregatorSupplier implements 
ProcessorSupplier {
+public static class CustomMaxAggregatorSupplier implements 
ProcessorSupplier {
 @Override
-public Processor get() {
+public Processor get() {
 return new CustomMaxAggregator();
 }
 }
 
-public static class CustomMaxAggregator implements Processor 
{
-ProcessorContext context;
+public static class CustomMaxAggregator implements Processor {
+ProcessorContext context;
 private KeyValueStore store;
 
 @SuppressWarnings("unchecked")
 @Override
-public void init(final ProcessorContext context) {
+public void init(final ProcessorContext context) {
 this.context = context;
 context.schedule(Duration.ofSeconds(60), 
PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
 context.schedule(Duration.ofSeconds(10), 
PunctuationType.STREAM_TIME, time -> flushStore());
-store = (KeyValueStore) 
context.getStateStore("aggStore");
+store = context.getStateStore("aggStore");

Review comment:
   This is a small improvement I noticed; I'll mention this on the KIP 
discussion if you like it. I've changed the ProcessorContext getStateStore 
method so that we don't have to cast the store type anymore. The generic 
parameters to the method take care of casting now.

##
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##
@@ -658,8 +658,42 @@ public synchronized Topology addSink(final String name,
  */
 @SuppressWarnings("rawtypes")
 public synchronized Topology addProcessor(final String name,
-  final ProcessorSupplier supplier,
+  final 
org.apache.kafka.streams.processor.ProcessorSupplier supplier,
   final String... parentNames) {
+return addProcessor(
+name,
+new ProcessorSupplier() {
+@Override
+public Set> stores() {
+return supplier.stores();
+}
+
+@Override
+public 
org.apache.kafka.streams.processor.api.Processor get() {
+return ProcessorAdapter.adaptRaw(supplier.get());
+}
+},
+parentNames
+);
+}

Review comment:
   
   
   as in previous changes, delegating the old API to the new one.
   

##
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
##
@@ -145,24 +145,24 @@ public void shouldPunctuateIfWallClockTimeAdvances() {
 assertThat(outputTopic.isEmpty(), is(true));
 }
 
-public static class CustomMaxAggregatorSupplier implements 
ProcessorSupplier {
+public static class CustomMaxAggregatorSupplier implements 
ProcessorSupplier {

Review comment:
   Since the new public API change is small, I also converted almost all of 
the usages of the old API to the new one.

##
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##
@@ -772,6 +806,94 @@ public synchronized Topology addStateStore(final 
StoreBuilder storeBuilder,
 return this;
 }
 
+/**
+ * Adds a global {@link StateStore} to the topology.
+ * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+ * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+ * 
+ * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+ * of the input topic.
+ * 
+ * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+ * records forwarded from the {@link SourceNode}.
+ * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+ * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+ *
+ * @param storeBuilder  user defined state store builder
+ * @param sourceNamename of the {@link SourceNode} that will 
be automatically added
+ * @param keyDeserializer   the {@link Deserializer} to deserialize 
keys with
+ * @param valueDeserializer the {@link Deserializer} to deserialize 
values with
+ * @param topic the topic to source the 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-08-25 Thread GitBox


ableegoldman commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r476962075



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##
@@ -416,26 +552,43 @@ private long currentSegmentLastTime() {
 }
 
 private void getNextSegmentIterator() {
-++currentSegmentId;
-lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
+if (forward) {
+++currentSegmentId;
+lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
 
-if (currentSegmentId > lastSegmentId) {
-current = null;
-return;
-}
+if (currentSegmentId > lastSegmentId) {
+current = null;
+return;
+}
 
-setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());
+setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());
 
-current.close();
-current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+current.close();
+
+current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+} else {
+--currentSegmentId;
+//lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));

Review comment:
   Guessing this is not actually meant to be commented out  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-08-25 Thread GitBox


ableegoldman commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r476969847



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
##
@@ -104,7 +121,47 @@ public V fetch(final K key, final long time) {
 
 return new KeyValueIterator, V>() {
 @Override
-public void close() {}
+public void close() {
+}
+
+@Override
+public Windowed peekNextKey() {
+throw new UnsupportedOperationException("peekNextKey() not 
supported in " + getClass().getName());
+}
+
+@Override
+public boolean hasNext() {
+return iterator.hasNext();
+}
+
+@Override
+public KeyValue, V> next() {
+return iterator.next();
+}
+
+};
+}
+
+@Override
+public KeyValueIterator, V> backwardAll() {
+if (!open) {
+throw new InvalidStateStoreException("Store is not open");
+}
+final List, V>> results = new ArrayList<>();
+for (final long now : data.keySet()) {

Review comment:
   Same here (and all the backwards ReadOnlyWindowStoreStub methods): I 
think we are kind of forced to invert the key ordering for the backwards fetch 
methods as well, even if we don't necessarily want to. Probably users shouldn't 
be relying on a strict ordering of the keys anyway but we do have to match the 
ordering of the cache

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##
@@ -201,18 +203,48 @@ public synchronized void put(final Bytes key,
 }
 
 final PeekingKeyValueIterator cacheIterator = 
wrapped().persistent() ?
-new CacheIteratorWrapper(key, timeFrom, timeTo) :
-context.cache().range(cacheName,
-
cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, timeFrom)),
-
cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, timeTo))
+new CacheIteratorWrapper(key, timeFrom, timeTo, true) :
+context.cache().range(
+cacheName,
+cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, 
timeFrom)),
+cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, 
timeTo))
 );
 
 final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(key, key, timeFrom, timeTo);
-final PeekingKeyValueIterator 
filteredCacheIterator = new FilteredCacheIterator(
-cacheIterator, hasNextCondition, cacheFunction
-);
+final PeekingKeyValueIterator 
filteredCacheIterator =
+new FilteredCacheIterator(cacheIterator, hasNextCondition, 
cacheFunction);
 
-return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, 
underlyingIterator);
+return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, 
underlyingIterator, true);
+}
+
+@Override
+public synchronized WindowStoreIterator backwardFetch(final Bytes 
key,
+  final 
Instant from,
+  final 
Instant to) {

Review comment:
   I guess we should use the long signature here too, and do the conversion 
from Instant to long in a default implementation on the WindowStore interface?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##
@@ -416,26 +552,43 @@ private long currentSegmentLastTime() {
 }
 
 private void getNextSegmentIterator() {
-++currentSegmentId;
-lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
+if (forward) {
+++currentSegmentId;
+lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
 
-if (currentSegmentId > lastSegmentId) {
-current = null;
-return;
-}
+if (currentSegmentId > lastSegmentId) {
+current = null;
+return;
+}
 
-setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());
+setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());
 
-current.close();
-current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+current.close();
+
+current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+} else {
+--currentSegmentId;
+//lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));

Review comment:
  

[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-08-25 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184837#comment-17184837
 ] 

Guozhang Wang commented on KAFKA-10134:
---

[~zhowei] could you try out https://github.com/apache/kafka/pull/8834 and lmk 
if it works fixing the issue now.

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.5.2, 2.6.1
>
> Attachments: consumer3.log.2020-08-20.log, 
> consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-08-25 Thread GitBox


guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r476950994



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -483,12 +492,7 @@ private synchronized void resetStateAndRejoin() {
 // rebalance in the call to poll below. This ensures that we do not 
mistakenly attempt
 // to rejoin before the pending rebalance has completed.
 if (joinFuture == null) {
-// fence off the heartbeat thread explicitly so that it cannot 
interfere with the join group.
-// Note that this must come after the call to onJoinPrepare since 
we must be able to continue
-// sending heartbeats if that callback takes some time.
-disableHeartbeatThread();

Review comment:
   We do not need to explicitly disable heartbeat thread since when the 
state is transited to PREPARING_REBALANCE, the thread would disable itself in 
the next iteration.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -326,8 +331,9 @@ protected synchronized void pollHeartbeat(long now) {
 }
 
 protected synchronized long timeToNextHeartbeat(long now) {
-// if we have not joined the group, we don't need to send heartbeats
-if (state == MemberState.UNJOINED)
+// if we have not joined the group or we are preparing rebalance,

Review comment:
   This is the major fix 2) in description.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -917,17 +938,14 @@ private synchronized void resetGeneration() {
 synchronized void resetGenerationOnResponseError(ApiKeys api, Errors 
error) {
 log.debug("Resetting generation after encountering {} from {} response 
and requesting re-join", error, api);
 
-// only reset the state to un-joined when it is not already in 
rebalancing

Review comment:
   We do not need this check any more since when we are only resetting 
generation if we see illegal generation or unknown member id, and in either 
case we should no longer heartbeats

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -497,40 +501,18 @@ private synchronized void resetStateAndRejoin() {
 joinFuture.addListener(new RequestFutureListener() {
 @Override
 public void onSuccess(ByteBuffer value) {
-// handle join completion in the callback so that the 
callback will be invoked

Review comment:
   Moved this into sync-group handler for readability.

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##
@@ -1098,44 +1136,6 @@ public void 
testWakeupAfterJoinGroupReceivedExternalCompletion() throws Exceptio
 awaitFirstHeartbeat(heartbeatReceived);
 }
 
-@Test
-public void testWakeupAfterSyncGroupSent() throws Exception {

Review comment:
   This is now a redundant test.

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int,
   responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
 case CompletingRebalance =>
-responseCallback(Errors.REBALANCE_IN_PROGRESS)
+  // consumers may start sending heartbeat after join-group 
response, in which case
+  // we should treat them as normal hb request and reset the timer
+  val member = group.get(memberId)

Review comment:
   This is the only logical change as 3) in the description. All others are 
logging changes.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -1311,9 +1324,10 @@ public void run() {
 continue;
 }
 
-if (state != MemberState.STABLE) {
-// the group is not stable (perhaps because we 
left the group or because the coordinator
-// kicked us out), so disable heartbeats and wait 
for the main thread to rejoin.
+// we do not need to heartbeat we are not part of a 
group yet;
+// also if we already have fatal error, the client 
will be
+// crashed soon, hence we do not need to continue 
heartbeating either
+if (state.hasNotJoinedGroup() || hasFailed()) {

Review comment:
   This is the major fix 1).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use 

[GitHub] [kafka] vvcephei opened a new pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

2020-08-25 Thread GitBox


vvcephei opened a new pull request #9221:
URL: https://github.com/apache/kafka/pull/9221


   Converts `Topology#addProcessor` and `#addGlobalStore`
   Also, convert some of the internals in support of `addProcessor`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-08-25 Thread GitBox


guozhangwang commented on pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#issuecomment-680405410


   @ableegoldman @vvcephei @hachikuji Please take a look at the updated 
description on the top.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10436) Implement KIP-478 Topology changes

2020-08-25 Thread John Roesler (Jira)
John Roesler created KAFKA-10436:


 Summary: Implement KIP-478 Topology changes
 Key: KAFKA-10436
 URL: https://issues.apache.org/jira/browse/KAFKA-10436
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10379) Implement the KIP-478 StreamBuilder#addGlobalStore()

2020-08-25 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10379.
--
Resolution: Fixed

> Implement the KIP-478 StreamBuilder#addGlobalStore()
> 
>
> Key: KAFKA-10379
> URL: https://issues.apache.org/jira/browse/KAFKA-10379
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10379) Implement the KIP-478 StreamBuilder#addGlobalStore()

2020-08-25 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10379:
-
Fix Version/s: 2.7.0

> Implement the KIP-478 StreamBuilder#addGlobalStore()
> 
>
> Key: KAFKA-10379
> URL: https://issues.apache.org/jira/browse/KAFKA-10379
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lbradstreet commented on a change in pull request #9219: KAFKA-10432: LeaderEpochCache is incorrectly recovered for leader epoch 0

2020-08-25 Thread GitBox


lbradstreet commented on a change in pull request #9219:
URL: https://github.com/apache/kafka/pull/9219#discussion_r476925008



##
File path: core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
##
@@ -367,6 +371,45 @@ class LogSegmentTest {
 assertEquals(100L, abortedTxn.lastStableOffset)
   }
 
+  /**
+   * Create a segment with some data, then recover the segment.
+   * The epoch cache entries should reflect the segment.
+   */
+  @Test
+  def testRecoveryRebuildsEpochCache(): Unit = {
+val seg = createSegment(0)
+
+val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
+  private var epochs = Seq.empty[EpochEntry]
+
+  override def write(epochs: Seq[EpochEntry]): Unit = {
+this.epochs = epochs.toVector
+  }
+
+  override def read(): Seq[EpochEntry] = this.epochs
+}
+
+val cache = new LeaderEpochFileCache(topicPartition, () => 
seg.readNextOffset, checkpoint)
+seg.append(largestOffset = 105L, largestTimestamp = 
RecordBatch.NO_TIMESTAMP,
+  shallowOffsetOfMaxTimestamp = 104L, MemoryRecords.withRecords(104L, 
CompressionType.NONE, 0,
+new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+seg.append(largestOffset = 107L, largestTimestamp = 
RecordBatch.NO_TIMESTAMP,
+  shallowOffsetOfMaxTimestamp = 106L, MemoryRecords.withRecords(106L, 
CompressionType.NONE, 1,
+new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+seg.append(largestOffset = 109L, largestTimestamp = 
RecordBatch.NO_TIMESTAMP,
+  shallowOffsetOfMaxTimestamp = 108L, MemoryRecords.withRecords(108L, 
CompressionType.NONE, 1,
+new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+seg.append(largestOffset = 111L, largestTimestamp = 
RecordBatch.NO_TIMESTAMP,
+  shallowOffsetOfMaxTimestamp = 110, MemoryRecords.withRecords(110L, 
CompressionType.NONE, 2,
+new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+seg.recover(new ProducerStateManager(topicPartition, logDir), Some(cache))
+assertEquals(ArrayBuffer(EpochEntry(0, 104L), EpochEntry(epoch=1, 
startOffset=106), EpochEntry(epoch=2, startOffset=110)), cache.epochEntries)

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9219: KAFKA-10432: LeaderEpochCache is incorrectly recovered for leader epoch 0

2020-08-25 Thread GitBox


hachikuji commented on a change in pull request #9219:
URL: https://github.com/apache/kafka/pull/9219#discussion_r476915997



##
File path: core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
##
@@ -367,6 +371,45 @@ class LogSegmentTest {
 assertEquals(100L, abortedTxn.lastStableOffset)
   }
 
+  /**
+   * Create a segment with some data, then recover the segment.
+   * The epoch cache entries should reflect the segment.
+   */
+  @Test
+  def testRecoveryRebuildsEpochCache(): Unit = {
+val seg = createSegment(0)
+
+val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
+  private var epochs = Seq.empty[EpochEntry]
+
+  override def write(epochs: Seq[EpochEntry]): Unit = {
+this.epochs = epochs.toVector
+  }
+
+  override def read(): Seq[EpochEntry] = this.epochs
+}
+
+val cache = new LeaderEpochFileCache(topicPartition, () => 
seg.readNextOffset, checkpoint)
+seg.append(largestOffset = 105L, largestTimestamp = 
RecordBatch.NO_TIMESTAMP,
+  shallowOffsetOfMaxTimestamp = 104L, MemoryRecords.withRecords(104L, 
CompressionType.NONE, 0,
+new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+seg.append(largestOffset = 107L, largestTimestamp = 
RecordBatch.NO_TIMESTAMP,
+  shallowOffsetOfMaxTimestamp = 106L, MemoryRecords.withRecords(106L, 
CompressionType.NONE, 1,
+new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+seg.append(largestOffset = 109L, largestTimestamp = 
RecordBatch.NO_TIMESTAMP,
+  shallowOffsetOfMaxTimestamp = 108L, MemoryRecords.withRecords(108L, 
CompressionType.NONE, 1,
+new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+seg.append(largestOffset = 111L, largestTimestamp = 
RecordBatch.NO_TIMESTAMP,
+  shallowOffsetOfMaxTimestamp = 110, MemoryRecords.withRecords(110L, 
CompressionType.NONE, 2,
+new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+seg.recover(new ProducerStateManager(topicPartition, logDir), Some(cache))
+assertEquals(ArrayBuffer(EpochEntry(0, 104L), EpochEntry(epoch=1, 
startOffset=106), EpochEntry(epoch=2, startOffset=110)), cache.epochEntries)

Review comment:
   nit: use arg names consistently?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10434) Remove deprecated methods on WindowStore

2020-08-25 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184804#comment-17184804
 ] 

Sophie Blee-Goldman edited comment on KAFKA-10434 at 8/26/20, 12:20 AM:


Also, if there are performance concerns with using Instant, then why did we 
deprecate all of the public APIs that use long? We shouldn't set the bar lower 
for users of IQ, we have no idea what their performance requirements might be 
and they could be making many calls per second. I think we should un-deprecate 
them...

Of course, if we're going to be completely overhauling IQ shortly anyways, 
maybe it's not worth a KIP to un-deprecate it (do we actually need a KIP for 
that?) It just seems weird to force the tradeoff onto users of IQ 


was (Author: ableegoldman):
Also, if there are performance concerns with using Instant, then why did we 
deprecate all of the public APIs that use long? We shouldn't set the bar lower 
for our own internal use vs users of IQ, we have no idea what their performance 
requirements might be and they could be making many calls per second. I think 
we should un-deprecate them...

Of course, if we're going to be completely overhauling IQ shortly anyways, 
maybe it's not worth a KIP to un-deprecate it (do we actually need a KIP for 
that?) It just seems weird to force the tradeoff onto users of IQ 

> Remove deprecated methods on WindowStore
> 
>
> Key: KAFKA-10434
> URL: https://issues.apache.org/jira/browse/KAFKA-10434
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> From [https://github.com/apache/kafka/pull/9138#discussion_r474985997] and 
> [https://github.com/apache/kafka/pull/9138#discussion_r474995606] :
> WindowStore contains ReadOnlyWindowStore methods.
> We could consider:
>  * Moving read methods from WindowStore to ReadOnlyWindowStore and/or
>  * Consider removing long based methods



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-08-25 Thread GitBox


guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r476065182



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -287,7 +287,7 @@ class GroupCoordinator(val brokerId: Int,
 
   group.currentState match {
 case PreparingRebalance =>
-  updateMemberAndRebalance(group, member, protocols, 
responseCallback)
+  updateMemberAndRebalance(group, member, protocols, s"Member 
${member.memberId} joining group during ${group.currentState}", 
responseCallback)

Review comment:
   Yes, it only contains logging changes. But I will make some non logging 
changes later and will mark it explicitly.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) {
 }
 
 private void recordRebalanceFailure() {
-state = MemberState.UNJOINED;

Review comment:
   Yes I agree, I think we should just let the heartbeat thread access the 
state itself and then based on that decide whether or not to send heartbeats, I 
will update this logic.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -1069,6 +1069,13 @@ private HeartbeatResponseHandler(final Generation 
generation) {
 public void handle(HeartbeatResponse heartbeatResponse, 
RequestFuture future) {
 sensors.heartbeatSensor.record(response.requestLatencyMs());
 Errors error = heartbeatResponse.error();
+
+if (state != MemberState.STABLE) {

Review comment:
   My thoughts were that, when we are in rebalancing then the purpose of 
heartbeat is only to keep the consumer alive at the broker side, not to take 
any instructions. But I think it should be handled case-by-case, I will try to 
refactor this piece a bit as well.

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##
@@ -604,6 +605,25 @@ public void 
testSyncGroupIllegalGenerationResponseWithOldGeneration() throws Int
 assertEquals(newGen, coordinator.generation());
 }
 
+@Test
+public void testHeartbeatSentWhenRebalancing() throws Exception {
+setupCoordinator();
+joinGroup();
+
+final AbstractCoordinator.Generation currGen = 
coordinator.generation();
+
+coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+
+// the heartbeat thread should be sent out during a rebalance
+mockTime.sleep(HEARTBEAT_INTERVAL_MS);
+TestUtils.waitForCondition(() -> !mockClient.requests().isEmpty(), 
2000,
+"The heartbeat request was not sent");
+assertTrue(coordinator.heartbeat().hasInflight());
+
+mockClient.respond(heartbeatResponse(Errors.REBALANCE_IN_PROGRESS));

Review comment:
   Actually we do not need to :)

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) {
 }
 
 private void recordRebalanceFailure() {

Review comment:
   Yup, I will just inline this then.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10434) Remove deprecated methods on WindowStore

2020-08-25 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184804#comment-17184804
 ] 

Sophie Blee-Goldman commented on KAFKA-10434:
-

Also, if there are performance concerns with using Instant, then why did we 
deprecate all of the public APIs that use long? We shouldn't set the bar lower 
for our own internal use vs users of IQ, we have no idea what their performance 
requirements might be and they could be making many calls per second. I think 
we should un-deprecate them...

Of course, if we're going to be completely overhauling IQ shortly anyways, 
maybe it's not worth a KIP to un-deprecate it (do we actually need a KIP for 
that?) It just seems weird to force the tradeoff onto users of IQ 

> Remove deprecated methods on WindowStore
> 
>
> Key: KAFKA-10434
> URL: https://issues.apache.org/jira/browse/KAFKA-10434
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> From [https://github.com/apache/kafka/pull/9138#discussion_r474985997] and 
> [https://github.com/apache/kafka/pull/9138#discussion_r474995606] :
> WindowStore contains ReadOnlyWindowStore methods.
> We could consider:
>  * Moving read methods from WindowStore to ReadOnlyWindowStore and/or
>  * Consider removing long based methods



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lbradstreet commented on pull request #9213: MINOR: add epoch lineage checks to system tests

2020-08-25 Thread GitBox


lbradstreet commented on pull request #9213:
URL: https://github.com/apache/kafka/pull/9213#issuecomment-680322962


   This PR was able to help find KAFKA-10432: 
https://github.com/apache/kafka/pull/9219



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-08-25 Thread GitBox


abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r476864248



##
File path: 
clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationFailureException.java
##
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Exception used to indicate a broker side authorization failure during 
request redirection.
+ */
+public class BrokerAuthorizationFailureException extends 
AuthorizationException {
+

Review comment:
   Interesting, why does the `AuthorizationException` have no 
`serialVersionUID`? Is it because we never use that error code explicitly?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-08-25 Thread GitBox


abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r476860796



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
##
@@ -87,6 +87,16 @@ public Builder(Map configs, boolean 
validateOnly) {
 public AlterConfigsRequest build(short version) {
 return new AlterConfigsRequest(data, version);
 }
+

Review comment:
   The purpose is for the mock tests to compare the expected builder in 
`KafkaApisTest`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10435) Fetch protocol changes for KIP-595

2020-08-25 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-10435:

Description: KIP-595 makes several changes to the Fetch protocol. Since 
this affects inter-broker communication, it is useful to do this separately.  
(was: KIP-595 makes several changes to the Fetch protocol. Since this affects 
inter-broker communication, it is useful to separate this into a separate 
change.)

> Fetch protocol changes for KIP-595
> --
>
> Key: KAFKA-10435
> URL: https://issues.apache.org/jira/browse/KAFKA-10435
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> KIP-595 makes several changes to the Fetch protocol. Since this affects 
> inter-broker communication, it is useful to do this separately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-08-25 Thread GitBox


ableegoldman commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r476849786



##
File path: streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
##
@@ -119,15 +118,16 @@
  * 
  * This iterator must be closed after use.
  *
- * @param from  the first key in the range
- * @param tothe last key in the range
- * @param timeFrom  time range start (inclusive)
- * @param timeTotime range end (inclusive)
+ * @param from the first key in the range
+ * @param to   the last key in the range
+ * @param timeFrom time range start (inclusive)
+ * @param timeTo   time range end (inclusive)
  * @return an iterator over windowed key-value pairs {@code , 
value>}
  * @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException if one of the given keys is {@code null}
+ * @throws NullPointerException   if one of the given keys is {@code 
null}
  */
-@SuppressWarnings("deprecation") // note, this method must be kept if 
super#fetch(...) is removed
+// note, this method must be kept if super#fetch(...) is removed
+@SuppressWarnings("deprecation")
 KeyValueIterator, V> fetch(K from, K to, long timeFrom, long 
timeTo);

Review comment:
   Ok, given @mjsax 's response on 
[KAFKA-10434](https://issues.apache.org/jira/browse/KAFKA-10434) it seems like 
we actually need to add the reverse variation of these long-based methods to 
the WindowStore API





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10435) Fetch protocol changes for KIP-595

2020-08-25 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10435:
---

 Summary: Fetch protocol changes for KIP-595
 Key: KAFKA-10435
 URL: https://issues.apache.org/jira/browse/KAFKA-10435
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Jason Gustafson


KIP-595 makes several changes to the Fetch protocol. Since this affects 
inter-broker communication, it is useful to separate this into a separate 
change.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10434) Remove deprecated methods on WindowStore

2020-08-25 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184778#comment-17184778
 ] 

Sophie Blee-Goldman commented on KAFKA-10434:
-

Gotcha. Then can we at least put this explanation in the comments, the current 
{code:java}
// note, this method must be kept if super#fetch(...) is removed
{code}
comments are not particularly illuminating. 

[~jeqo] I guess we would indeed want to use the `long` version of the signature 
for the underlying store in the reverse iterator PR (and add them to the 
WindowStore API). 

Thanks for clarifying [~mjsax]

 

> Remove deprecated methods on WindowStore
> 
>
> Key: KAFKA-10434
> URL: https://issues.apache.org/jira/browse/KAFKA-10434
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> From [https://github.com/apache/kafka/pull/9138#discussion_r474985997] and 
> [https://github.com/apache/kafka/pull/9138#discussion_r474995606] :
> WindowStore contains ReadOnlyWindowStore methods.
> We could consider:
>  * Moving read methods from WindowStore to ReadOnlyWindowStore and/or
>  * Consider removing long based methods



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9217: MINOR: fix JavaDoc

2020-08-25 Thread GitBox


mjsax commented on pull request #9217:
URL: https://github.com/apache/kafka/pull/9217#issuecomment-680310495


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax closed pull request #9217: MINOR: fix JavaDoc

2020-08-25 Thread GitBox


mjsax closed pull request #9217:
URL: https://github.com/apache/kafka/pull/9217


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10434) Remove deprecated methods on WindowStore

2020-08-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184776#comment-17184776
 ] 

Matthias J. Sax commented on KAFKA-10434:
-

We want to keep those methods as discussed on the KIP for perf reasons. Only 
for the "ReadOnly" stores that are used for IQ, we want to have the methods 
using `Instant` and `Duration` as they provide a better API. However, for the 
"read-write" interfaces we want to have the more performant `long` variants to 
avoid unnecessary (shorted lived) object creation.

Atm, we inherit the `long` variants from the read-only-interface. However, when 
we remove those methods from the read-only interface, we need to declare them 
on the "read-write" interface. The original PR did this "preparation" already 
by "re-declaring" the methods (that is kinda redundant atm, however allows us 
to mark the methods as "not deprecated" in the "read-write" classes). Hence, we 
don't mark those methods as `@Deprecated` as we don't intent to remove them, 
but we need to suppress the deprecation warning as long as we inherit the 
methods from the read-only interfaces.

I think we should close this ticket as "invalid". \cc [~ableegoldman]

> Remove deprecated methods on WindowStore
> 
>
> Key: KAFKA-10434
> URL: https://issues.apache.org/jira/browse/KAFKA-10434
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> From [https://github.com/apache/kafka/pull/9138#discussion_r474985997] and 
> [https://github.com/apache/kafka/pull/9138#discussion_r474995606] :
> WindowStore contains ReadOnlyWindowStore methods.
> We could consider:
>  * Moving read methods from WindowStore to ReadOnlyWindowStore and/or
>  * Consider removing long based methods



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeqo commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-08-25 Thread GitBox


jeqo commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r476796186



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##
@@ -163,7 +164,17 @@ public void put(final Bytes key, final byte[] value, final 
long windowStartTimes
 @Deprecated
 @Override
 public WindowStoreIterator fetch(final Bytes key, final long 
timeFrom, final long timeTo) {
+return fetch(key, timeFrom, timeTo, true);
+}
+
+@Override
+public WindowStoreIterator backwardFetch(final Bytes key, final 
Instant from, final Instant to) {
+final long timeFrom = ApiUtils.validateMillisecondInstant(from, 
prepareMillisCheckFailMsgPrefix(from, "from"));
+final long timeTo = ApiUtils.validateMillisecondInstant(to, 
prepareMillisCheckFailMsgPrefix(to, "to"));

Review comment:
   https://issues.apache.org/jira/browse/KAFKA-10434 created to follow up 
this.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##
@@ -163,7 +164,17 @@ public void put(final Bytes key, final byte[] value, final 
long windowStartTimes
 @Deprecated
 @Override
 public WindowStoreIterator fetch(final Bytes key, final long 
timeFrom, final long timeTo) {
+return fetch(key, timeFrom, timeTo, true);
+}
+
+@Override
+public WindowStoreIterator backwardFetch(final Bytes key, final 
Instant from, final Instant to) {
+final long timeFrom = ApiUtils.validateMillisecondInstant(from, 
prepareMillisCheckFailMsgPrefix(from, "from"));
+final long timeTo = ApiUtils.validateMillisecondInstant(to, 
prepareMillisCheckFailMsgPrefix(to, "to"));

Review comment:
   https://issues.apache.org/jira/browse/KAFKA-10434 created to follow this 
up.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10434) Remove deprecated methods on WindowStore

2020-08-25 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-10434:


 Summary: Remove deprecated methods on WindowStore
 Key: KAFKA-10434
 URL: https://issues.apache.org/jira/browse/KAFKA-10434
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Jorge Esteban Quilcate Otoya


>From [https://github.com/apache/kafka/pull/9138#discussion_r474985997] and 
>[https://github.com/apache/kafka/pull/9138#discussion_r474995606] :

WindowStore contains ReadOnlyWindowStore methods.

We could consider:
 * Moving read methods from WindowStore to ReadOnlyWindowStore and/or
 * Consider removing long based methods



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-25 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r476783905



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -211,6 +217,67 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 }
 }
 
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+boolean rightWinAlreadyCreated = false;
+final HashSet windowStartTimes = new HashSet();
+
+try (
+final KeyValueIterator, 
ValueAndTimestamp> iterator = windowStore.fetch(
+key,
+key,
+timestamp - 2 * windows.timeDifferenceMs(),

Review comment:
   See above -- even if it seems to be working I think we should restrict 
the range anyway. We know that there's no windows earlier than 0 so why extend 
the query beyond this?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-25 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r476781126



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -148,7 +153,7 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean rightWinAlreadyCreated = false;
 
 // keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+KeyValue, ValueAndTimestamp> latestLeftTypeWindow 
= null;
 try (
 final KeyValueIterator, 
ValueAndTimestamp> iterator = windowStore.fetch(

Review comment:
   Are you sure it's actually returning something? Have you tested it with 
a rocksdb store or just with the in-memory store? I think the in-memory store 
would handle this fine since it never serializes the key/timestamps, but if you 
have a rocksdb store (or a caching layer) then the range query works by looking 
up any data between the serialized bounds. Unfortunately a negative long is 
lexicographically greater than a positive long when serialized to bytes. The 
"negative" is encoded as a leading 1 -- which means the lower bound ends up 
being "larger" than the upper bound.
   
   I would assume that would result in no data being returned, but I'm not 
actually 100% sure what would happen





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-25 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r476781126



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -148,7 +153,7 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean rightWinAlreadyCreated = false;
 
 // keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+KeyValue, ValueAndTimestamp> latestLeftTypeWindow 
= null;
 try (
 final KeyValueIterator, 
ValueAndTimestamp> iterator = windowStore.fetch(

Review comment:
   Are you sure it's actually returning something? Have you tested it with 
a rocksdb store or just with the in-memory store? I think the in-memory store 
would handle this fine since it never serializes the key/timestamps, but if you 
have a rocksdb store (or a caching layer) then the range query works by looking 
up any data between the serialized bounds. Unfortunately a negative long is 
lexicographically greater than a positive long when serialized to bytes. The 
"negative" is encoded as a leading 1 -- which means the lower bound ends up 
being "larger" than the upper bound. I assume that would result in no data 
being returned, but I'm not actually 100% sure what would happen





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9217: MINOR: fix JavaDoc

2020-08-25 Thread GitBox


vvcephei commented on a change in pull request #9217:
URL: https://github.com/apache/kafka/pull/9217#discussion_r476754081



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##
@@ -106,7 +105,7 @@ void register(final StateStore store,
 
 /**
  * Schedules a periodic operation for processors. A processor may call 
this method during
- * {@link 
Processor#init(org.apache.kafka.streams.processor.ProcessorContext) 
initialization} or
+ * {@link Processor#init(ProcessorContext) initialization} or

Review comment:
   Good catch.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/StreamsNotStartedException.java
##
@@ -17,11 +17,12 @@
 package org.apache.kafka.streams.errors;
 
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
 
 /**
- * Indicates that Kafka Streams is in state {@link KafkaStreams.State#CREATED 
CREATED} and thus state stores cannot be queries yet.
+ * Indicates that Kafka Streams is in state {@link State CREATED} and thus 
state stores cannot be queries yet.

Review comment:
   Ah!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] garmes-gdev commented on a change in pull request #7561: [WIP] KAFKA-7739: Tiered storage

2020-08-25 Thread GitBox


garmes-gdev commented on a change in pull request #7561:
URL: https://github.com/apache/kafka/pull/7561#discussion_r476743028



##
File path: build.gradle
##
@@ -1932,6 +1954,82 @@ project(':connect:basic-auth-extension') {
   }
 }
 
+project(':remote-storage-managers:hdfs') {
+  archivesBaseName = "kafka-rsm-hdfs"
+
+  configurations {
+localDeps
+  }
+
+  dependencies {
+localDeps group: 'org.apache.hadoop', name: 'hadoop-client', version: 
'3.1.2'
+compile configurations.localDeps
+compile project(':core')
+
+testCompile libs.junit
+testCompile group: 'org.apache.hadoop', name: 'hadoop-minicluster', 
version: '3.1.2'
+testCompile project(':core')
+testCompile project(':core').sourceSets.test.output
+testCompile project(':clients').sourceSets.test.output
+testRuntime libs.slf4jlog4j
+  }
+
+  checkstyle {
+configProperties = 
checkstyleConfigProperties("import-control-remote-storage-managers.xml")
+  }
+
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+from(configurations.localDeps - project(':core').configurations.runtime - 
project(':tools').configurations.runtime) {
+  exclude('kafka-rsm-hdfs*')
+}
+into "$buildDir/dependant-libs"
+duplicatesStrategy 'exclude'
+  }
+
+  jar {
+dependsOn copyDependantLibs
+  }
+}
+
+project(':remote-storage-managers:s3') {
+  archivesBaseName = "kafka-rsm-s3"
+
+  configurations {
+localDeps
+  }
+
+  dependencies {
+compile group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: 
'1.11.726'

Review comment:
   hello @satishd  I think you need to update 'compile' by 'localDeps' in 
order to generate "dependant-libs". 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] garmes-gdev commented on a change in pull request #7561: [WIP] KAFKA-7739: Tiered storage

2020-08-25 Thread GitBox


garmes-gdev commented on a change in pull request #7561:
URL: https://github.com/apache/kafka/pull/7561#discussion_r476741716



##
File path: 
remote-storage-managers/s3/src/main/java/org/apache/kafka/rsm/s3/S3RemoteStorageManagerConfig.java
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.rsm.s3;
+
+import java.util.Map;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Regions;
+
+/**
+ * A configuration for {@link S3RemoteStorageManager}.
+ */
+public class S3RemoteStorageManagerConfig extends AbstractConfig {
+public static final String S3_BUCKET_NAME_CONFIG = "s3.bucket.name";

Review comment:
   Hello @satishd  I guess you miss the prefix  
REMOTE_STORAGE_MANAGER_CONFIG_PREFIX.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] garmes-gdev commented on a change in pull request #7561: [WIP] KAFKA-7739: Tiered storage

2020-08-25 Thread GitBox


garmes-gdev commented on a change in pull request #7561:
URL: https://github.com/apache/kafka/pull/7561#discussion_r476740137



##
File path: core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
##
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import java.io.{File, InputStream}
+import java.nio.file.{Files, Path}
+import java.util
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.function.{Consumer, Function}
+
+import kafka.log.{CleanableIndex, Log, OffsetIndex, OffsetPosition, TimeIndex, 
TransactionIndex, TxnIndexSearchResult}
+import kafka.utils.{CoreUtils, Logging}
+import org.apache.kafka.common.errors.CorruptRecordException
+import org.apache.kafka.common.log.remote.storage.{RemoteLogSegmentId, 
RemoteLogSegmentMetadata, RemoteStorageManager}
+import org.apache.kafka.common.utils.{KafkaThread, Utils}
+
+object RemoteIndexCache {
+  val DirName = "remote-log-index-cache"

Review comment:
   Hello @satishd, I am testing the PR locally and I faced a problem during 
the kafka restart (  org.apache.kafka.common.KafkaException: Found directory 
/tmp/kafka-logs/remote-log-index-cache, 'remote-log-index-cache' is not in the 
form of topic-partition or topic-partition.uniqueId-delete (if marked for 
deletion).
   Kafka's log directories (and children) should only contain Kafka topic data.)
   bests.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10417) suppress() with cogroup() throws ClassCastException

2020-08-25 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10417:
-
Fix Version/s: (was: 2.8.0)
   (was: 3.0.0)
   2.7.0

> suppress() with cogroup() throws ClassCastException
> ---
>
> Key: KAFKA-10417
> URL: https://issues.apache.org/jira/browse/KAFKA-10417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Wardha Perinkada Kattu
>Priority: Blocker
>  Labels: kafka-streams
> Fix For: 2.7.0
>
>
> Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` 
> throws `ClassCastException`
> Works fine without the `suppress()`
> Code block tested -
> {code:java}
> val stream1 = requestStreams.merge(successStreams).merge(errorStreams)
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.notificationSerde()))
> val streams2 = confirmationStreams
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.confirmationsSerde()))
> val cogrouped = 
> stream1.cogroup(notificationAggregator).cogroup(streams2, 
> confirmationsAggregator)
> 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong(
> .aggregate({ null }, Materialized.`as` NotificationMetric, WindowStore ByteArray>>("time-windowed-aggregated-stream-store")
> 
> .withValueSerde(serdesConfig.notificationMetricSerde()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()))
> .toStream()
> {code}
> Exception thrown is:
> {code:java}
> Caused by: java.lang.ClassCastException: class 
> org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to 
> class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier 
> (org.apache.kafka.streams.kstream.internals.PassThrough and 
> org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in 
> unnamed module of loader 'app')
> {code}
> [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9220: KAFKA-10433 Reuse the ByteBuffer in validating compressed records

2020-08-25 Thread GitBox


chia7712 commented on pull request #9220:
URL: https://github.com/apache/kafka/pull/9220#issuecomment-680194628


   > Would you be OK if I submit that as a PR and we can compare?
   
   Please feel free to submit another PR. We all love to see the better 
solution :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9220: KAFKA-10433 Reuse the ByteBuffer in validating compressed records

2020-08-25 Thread GitBox


ijuma commented on pull request #9220:
URL: https://github.com/apache/kafka/pull/9220#issuecomment-680182676


   @chia7712 Thanks for the PR. The intent is good, but I think the approach 
should be a bit different. As it happens, I have implemented this other 
approach. Would you be OK if I submit that as a PR and we can compare?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #9220: KAFKA-10433 Reuse the ByteBuffer in validating compressed records

2020-08-25 Thread GitBox


chia7712 opened a new pull request #9220:
URL: https://github.com/apache/kafka/pull/9220


   issue: https://issues.apache.org/jira/browse/KAFKA-10433
   
   It is hot method so reusing the ByteBuffer can reduce a bunch of memory 
usage if the compression type supports BufferSupplier.
   
   **experiment**
   - duration: 1 minute
   - compression: LZ4
   - memory usage of byte array allocation: 4.52 GB -> 0.68 GB
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet opened a new pull request #9219: KAFKA-10432: LeaderEpochCache is incorrectly recovered for leader epoch 0

2020-08-25 Thread GitBox


lbradstreet opened a new pull request #9219:
URL: https://github.com/apache/kafka/pull/9219


   The leader epoch cache is incorrectly recovered for epoch 0 as the
   assignment is skipped when epoch == 0. This check was likely intended to
   prevent negative epochs from being applied or there was an assumption
   that epochs started at 1.
   
   A test has been added to LogSegmentTest to show the LogSegment
   recovery path works for the epoch cache. This was a test gap.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10433) Reuse the ByteBuffer in validating compressed records

2020-08-25 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-10433:
--

 Summary: Reuse the ByteBuffer in validating compressed records 
 Key: KAFKA-10433
 URL: https://issues.apache.org/jira/browse/KAFKA-10433
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


{code:java}
for (batch <- batches) {
  validateBatch(topicPartition, firstBatch, batch, origin, toMagic, 
brokerTopicStats)
  uncompressedSizeInBytes += 
AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())

  val recordsIterator = if (inPlaceAssignment && firstBatch.magic >= 
RecordBatch.MAGIC_VALUE_V2)
batch.skipKeyValueIterator(BufferSupplier.NO_CACHING)
  else
batch.streamingIterator(BufferSupplier.NO_CACHING)
{code}

It is hot method so reusing the ByteBuffer can reduce a bunch of memory usage 
if the compression type supports BufferSupplier.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10432) LeaderEpochCache is incorrectly recovered on segment recovery for epoch 0

2020-08-25 Thread Lucas Bradstreet (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184200#comment-17184200
 ] 

Lucas Bradstreet commented on KAFKA-10432:
--

After further discussion with Jason Gustafson, this this should not result in 
any incorrect truncations as the end offset search will still find the right 
offset given that only the first epoch start offset can potentially be 
incorrect. Overall this should not be a blocker, though it should be fixed 
because there could be subtle situations where it matters.
 

> LeaderEpochCache is incorrectly recovered on segment recovery for epoch 0
> -
>
> Key: KAFKA-10432
> URL: https://issues.apache.org/jira/browse/KAFKA-10432
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Lucas Bradstreet
>Priority: Major
>
> I added some functionality to the system tests to compare epoch cache 
> lineages ([https://github.com/apache/kafka/pull/9213]), and I found a bug in 
> leader epoch cache recovery.
> The test hard kills a broker and the cache hasn't been flushed yet, and then 
> it starts up and goes through log recovery. After recovery there is 
> divergence in the epoch caches for epoch 0:
> {noformat}
> AssertionError: leader epochs for output-topic-1 didn't match
>  [{0: 9393L, 2: 9441L, 4: 42656L},
>  {0: 0L, 2: 9441L, 4: 42656L}, 
>  {0: 0L, 2: 9441L, 4: 42656L}]
>   
>   
> {noformat}
> The cache is supposed to include the offset for epoch 0 but in recovery it 
> skips it 
> [https://github.com/apache/kafka/blob/487b3682ebe0eefde3445b37ee72956451a9d15e/core/src/main/scala/kafka/log/LogSegment.scala#L364]
>  due to 
> [https://github.com/apache/kafka/commit/d152989f26f51b9004b881397db818ad6eaf0392].
>  Then it stamps the epoch with a later offset when fetching from the leader.
> I'm not sure why the recovery code includes the condition 
> `batch.partitionLeaderEpoch > 0`. I discussed this with Jason Gustafson and 
> he believes it may have been intended to avoid assigning negative epochs but 
> is not sure why it was added. None of the tests fail with this check removed.
> {noformat}
>   leaderEpochCache.foreach { cache =>
> if (batch.partitionLeaderEpoch > 0 && 
> cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
>   cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
>   }
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10432) LeaderEpochCache is incorrectly recovered on segment recovery for epoch 0

2020-08-25 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-10432:


 Summary: LeaderEpochCache is incorrectly recovered on segment 
recovery for epoch 0
 Key: KAFKA-10432
 URL: https://issues.apache.org/jira/browse/KAFKA-10432
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.6.0, 2.5.0, 2.4.0, 2.3.0
Reporter: Lucas Bradstreet


I added some functionality to the system tests to compare epoch cache lineages 
([https://github.com/apache/kafka/pull/9213]), and I found a bug in leader 
epoch cache recovery.

The test hard kills a broker and the cache hasn't been flushed yet, and then it 
starts up and goes through log recovery. After recovery there is divergence in 
the epoch caches for epoch 0:
{noformat}
AssertionError: leader epochs for output-topic-1 didn't match
 [{0: 9393L, 2: 9441L, 4: 42656L},
 {0: 0L, 2: 9441L, 4: 42656L}, 
 {0: 0L, 2: 9441L, 4: 42656L}]  

  
{noformat}
The cache is supposed to include the offset for epoch 0 but in recovery it 
skips it 
[https://github.com/apache/kafka/blob/487b3682ebe0eefde3445b37ee72956451a9d15e/core/src/main/scala/kafka/log/LogSegment.scala#L364]
 due to 
[https://github.com/apache/kafka/commit/d152989f26f51b9004b881397db818ad6eaf0392].
 Then it stamps the epoch with a later offset when fetching from the leader.

I'm not sure why the recovery code includes the condition 
`batch.partitionLeaderEpoch > 0`. I discussed this with Jason Gustafson and he 
believes it may have been intended to avoid assigning negative epochs but is 
not sure why it was added. None of the tests fail with this check removed.
{noformat}
  leaderEpochCache.foreach { cache =>
if (batch.partitionLeaderEpoch > 0 && 
cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
  cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
  }
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ning2008wisc commented on a change in pull request #9215: KAFKA-10133: MM2 readme update on config

2020-08-25 Thread GitBox


ning2008wisc commented on a change in pull request #9215:
URL: https://github.com/apache/kafka/pull/9215#discussion_r476567599



##
File path: connect/mirror/README.md
##
@@ -141,7 +141,38 @@ nearby clusters.
 N.B. that the `--clusters` parameter is not technically required here. MM2 
will work fine without it; however, throughput may suffer from "producer lag" 
between
 data centers, and you may incur unnecessary data transfer costs.
 
-## Shared configuration
+## Configuration
+### General Kafka Connect Config
+All Kafka Connect, Source Connector, Sink Connector configs, as defined in 
[Kafka official doc] (https://kafka.apache.org/documentation/#connectconfigs), 
can be 
+directly used in MM2 configuration without prefix in the configuration name. 
As the starting point, most of these configs may work well with the exception 
of `tasks.max`.

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-25 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r476546410



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 }
 }
 
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+boolean rightWinAlreadyCreated = false;
+final HashSet windowStartTimes = new HashSet();
+
+try (
+final KeyValueIterator, 
ValueAndTimestamp> iterator = windowStore.fetch(
+key,
+key,
+timestamp - 2 * windows.timeDifferenceMs(),
+// to catch the current record's right window, if 
it exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long endTime = startTime + 
windows.timeDifferenceMs();
+
+if (endTime == windows.timeDifferenceMs()) {
+combinedWindow = next;
+} else if (endTime > timestamp && startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+if (combinedWindow == null) {
+final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+} else {
+//create the right window for the most recent max timestamp in 
the combined window
+final long rightWinStart = combinedWindow.value.timestamp() + 
1;

Review comment:
   I went with `maxRightWindowStart` since it's not always going to be the 
previous record, either in terms of process order or time order.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-25 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r476531518



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -148,7 +153,7 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean rightWinAlreadyCreated = false;
 
 // keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+KeyValue, ValueAndTimestamp> latestLeftTypeWindow 
= null;
 try (
 final KeyValueIterator, 
ValueAndTimestamp> iterator = windowStore.fetch(

Review comment:
   Fetch having negative bounds doesn't throw any errors or cause any 
issues, is there a different reason to make sure the bounds aren't negative? 
Since we don't store windows with a negative start time it shouldn't return 
anything we don't expect





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-25 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r476531745



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -211,6 +217,67 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 }
 }
 
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+boolean rightWinAlreadyCreated = false;
+final HashSet windowStartTimes = new HashSet();
+
+try (
+final KeyValueIterator, 
ValueAndTimestamp> iterator = windowStore.fetch(
+key,
+key,
+timestamp - 2 * windows.timeDifferenceMs(),

Review comment:
   Same as above, it seems like it still works as expected





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10430) Hook support

2020-08-25 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass reassigned KAFKA-10430:
---

Assignee: Viktor Somogyi-Vass

> Hook support
> 
>
> Key: KAFKA-10430
> URL: https://issues.apache.org/jira/browse/KAFKA-10430
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dennis Jaheruddin
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> Currently big data storage (like HDFS and HBASE) allows other tooling to hook 
> into it, for instance atlas.
> As data movement tools become more open to Kafka as well, it makes sense to 
> shift significant amounts of storage to Kafka, for instance when one just 
> needs a buffer.
> However, this may be blocked due to governance constraints. As currently 
> producers and consumers would need to actively make an effort to log 
> governance (where something like HDFS can guarantee its capture).
> Hence I believe we should make it possible to hook into kafka as well so one 
> does not simply depend on the integrity of the producers and consumers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10431) ProducerPerformance with payloadFile arg: add support for sequential or random outputs

2020-08-25 Thread Zaahir Laher (Jira)
Zaahir Laher created KAFKA-10431:


 Summary: ProducerPerformance with payloadFile arg: add support for 
sequential or random outputs
 Key: KAFKA-10431
 URL: https://issues.apache.org/jira/browse/KAFKA-10431
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Affects Versions: 2.5.1
Reporter: Zaahir Laher


When using ProducerPerformance  with the --payloadFile argument with a file 
with multiple payloads (i.e the default is one payload per line) , the 
ProducerPerformance randomly chooses payloads from the file. 

This could result in the same payload being sent, which may not be the desired 
result in some cases. 

It would be useful to all have another argument that allows for sequence 
payload submission if required. If left blank this arg would default to false 
(i.e default random selection).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10430) Hook support

2020-08-25 Thread Dennis Jaheruddin (Jira)
Dennis Jaheruddin created KAFKA-10430:
-

 Summary: Hook support
 Key: KAFKA-10430
 URL: https://issues.apache.org/jira/browse/KAFKA-10430
 Project: Kafka
  Issue Type: Improvement
Reporter: Dennis Jaheruddin


Currently big data storage (like HDFS and HBASE) allows other tooling to hook 
into it, for instance atlas.

As data movement tools become more open to Kafka as well, it makes sense to 
shift significant amounts of storage to Kafka, for instance when one just needs 
a buffer.

However, this may be blocked due to governance constraints. As currently 
producers and consumers would need to actively make an effort to log governance 
(where something like HDFS can guarantee its capture).

Hence I believe we should make it possible to hook into kafka as well so one 
does not simply depend on the integrity of the producers and consumers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10293) fix flaky streams/streams_eos_test.py

2020-08-25 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-10293.
---
Resolution: Fixed

> fix flaky streams/streams_eos_test.py
> -
>
> Key: KAFKA-10293
> URL: https://issues.apache.org/jira/browse/KAFKA-10293
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Priority: Major
>
> {quote}
> Module: kafkatest.tests.streams.streams_eos_test
> Class:  StreamsEosTest
> Method: test_failure_and_recovery_complex
> Arguments:
> {
>   "processing_guarantee": "exactly_once"
> }
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10293) fix flaky streams/streams_eos_test.py

2020-08-25 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-10293:
-

Assignee: Bruno Cadonna

> fix flaky streams/streams_eos_test.py
> -
>
> Key: KAFKA-10293
> URL: https://issues.apache.org/jira/browse/KAFKA-10293
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Bruno Cadonna
>Priority: Major
>
> {quote}
> Module: kafkatest.tests.streams.streams_eos_test
> Class:  StreamsEosTest
> Method: test_failure_and_recovery_complex
> Arguments:
> {
>   "processing_guarantee": "exactly_once"
> }
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10293) fix flaky streams/streams_eos_test.py

2020-08-25 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183927#comment-17183927
 ] 

Bruno Cadonna commented on KAFKA-10293:
---

[~chia7712] I will close this ticket because it seems this test got fixed. I 
checked recent runs of system tests and this test has never failed.

Feel free to reopen, if you think this test is still an issue.

> fix flaky streams/streams_eos_test.py
> -
>
> Key: KAFKA-10293
> URL: https://issues.apache.org/jira/browse/KAFKA-10293
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Priority: Major
>
> {quote}
> Module: kafkatest.tests.streams.streams_eos_test
> Class:  StreamsEosTest
> Method: test_failure_and_recovery_complex
> Arguments:
> {
>   "processing_guarantee": "exactly_once"
> }
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10357) Handle accidental deletion of repartition-topics as exceptional failure

2020-08-25 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183918#comment-17183918
 ] 

Bruno Cadonna edited comment on KAFKA-10357 at 8/25/20, 10:30 AM:
--

Yes, I also agree that initialize + config is the easiest way to solve this 
issue and we should follow this way. 


was (Author: cadonna):
Yes, I also agree that initialize + config is the easiest way to solve this 
issue and we follow this way. 

> Handle accidental deletion of repartition-topics as exceptional failure
> ---
>
> Key: KAFKA-10357
> URL: https://issues.apache.org/jira/browse/KAFKA-10357
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>
> Repartition topics are both written by Stream's producer and read by Stream's 
> consumer, so when they are accidentally deleted both clients may be notified. 
> But in practice the consumer would react to it much quicker than producer 
> since the latter has a delivery timeout expiration period (see 
> https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to 
> it, it will re-join the group since metadata changed and during the triggered 
> rebalance it would auto-recreate the topic silently and continue, causing 
> data lost silently. 
> One idea, is to only create all repartition topics *once* in the first 
> rebalance and not auto-create them any more in future rebalances, instead it 
> would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code 
> (https://issues.apache.org/jira/browse/KAFKA-10355).
> The challenge part would be, how to determine if it is the first-ever 
> rebalance, and there are several wild ideas I'd like to throw out here:
> 1) change the thread state transition diagram so that STARTING state would 
> not transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the 
> assign function we can check if the state is still in CREATED and not RUNNING.
> 2) augment the subscriptionInfo to encode whether or not this is the first 
> time ever rebalance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10357) Handle accidental deletion of repartition-topics as exceptional failure

2020-08-25 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183918#comment-17183918
 ] 

Bruno Cadonna commented on KAFKA-10357:
---

Yes, I also agree that initialize + config is the easiest way to solve this 
issue and we follow this way. 

> Handle accidental deletion of repartition-topics as exceptional failure
> ---
>
> Key: KAFKA-10357
> URL: https://issues.apache.org/jira/browse/KAFKA-10357
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>
> Repartition topics are both written by Stream's producer and read by Stream's 
> consumer, so when they are accidentally deleted both clients may be notified. 
> But in practice the consumer would react to it much quicker than producer 
> since the latter has a delivery timeout expiration period (see 
> https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to 
> it, it will re-join the group since metadata changed and during the triggered 
> rebalance it would auto-recreate the topic silently and continue, causing 
> data lost silently. 
> One idea, is to only create all repartition topics *once* in the first 
> rebalance and not auto-create them any more in future rebalances, instead it 
> would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code 
> (https://issues.apache.org/jira/browse/KAFKA-10355).
> The challenge part would be, how to determine if it is the first-ever 
> rebalance, and there are several wild ideas I'd like to throw out here:
> 1) change the thread state transition diagram so that STARTING state would 
> not transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the 
> assign function we can check if the state is still in CREATED and not RUNNING.
> 2) augment the subscriptionInfo to encode whether or not this is the first 
> time ever rebalance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-10423) Logtash is restarting with invalid_fetch_session_epoch error

2020-08-25 Thread Vaibhav Nagpal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vaibhav Nagpal updated KAFKA-10423:
---
Comment: was deleted

(was: I am also experience this error in confluentinc-kafka-connect-s3-5.3.0 
(which should have Kafka 2.3.0 client version)


{code:java}
[2020-08-23 00:41:56,831] INFO [Consumer clientId=consumer-109, 
groupId=connect-ls_verify_qa] Node 3 was unable to process the fetch request 
with (sessionId=1446130732, epoch=126166): INVALID_FETCH_SESSION_EPOCH. 
(org.apache.kafka.clients.FetchSessionHandler){code}


However Kafka connect seems to be able to process things)

> Logtash is restarting with invalid_fetch_session_epoch error
> 
>
> Key: KAFKA-10423
> URL: https://issues.apache.org/jira/browse/KAFKA-10423
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, KafkaConnect, logging
>Affects Versions: 2.3.0
>Reporter: Akshay Sharma
>Priority: Major
>
> Logstash(input plugin is kafka) is restarting again and again with error as 
> mentioned below
>  
> logs:
> ```
> {"log":"[INFO ] 2020-07-27 05:03:43.873 [Ruby-0-Thread-12: 
> /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:244]
>  FetchSessionHandler - [Consumer clientId=logstash-0, groupId=logstash] Node 
> 1001 was unable to process the fetch request with (sessionId=2115239606, 
> epoch=1128): 
> INVALID_FETCH_SESSION_EPOCH.\n","stream":"stdout","time":"2020-07-27T05:03:43.873634303Z"}
>  \{"log":"[WARN ] 2020-07-27 05:14:18.976 [SIGTERM handler] runner - SIGTERM 
> received. Shutting 
> down.\n","stream":"stdout","time":"2020-07-27T05:14:18.976808493Z"}
>  
> {"log":"[INFO ] 2020-07-27 05:14:19.030 [Ruby-0-Thread-12: 
> /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:244]
>  AbstractCoordinator - [Consumer clientId=logstash-0, groupId=logstash] 
> Sending LeaveGroup request to coordinator 
> ikafka-0.kafka-headless.default.svc.cluster.local:9092 (id: 2147482646 rack: 
> null)\n","stream":"stdout","time":"2020-07-27T05:14:19.031121275Z"}
> ```
> KAFKA LOGS
> ```
> {"log":"[2020-07-27 05:14:19,031] INFO [GroupCoordinator 1001]: Member 
> logstash-0-9a59ad3d-c6ab-4dba-9775-5974d17934d1 in group logstash has left, 
> removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)\n","stream":"stdout","time":"2020-07-27T05:14:19.032132241Z"}
>  \{"log":"[2020-07-27 05:14:19,032] INFO [GroupCoordinator 1001]: Preparing 
> to rebalance group logstash in state PreparingRebalance with old generation 
> 85 (__consumer_offsets-49) (reason: removing member 
> logstash-0-9a59ad3d-c6ab-4dba-9775-5974d17934d1 on LeaveGroup) 
> (kafka.coordinator.group.GroupCoordinator)\n","stream":"stdout","time":"2020-07-27T05:14:19.032320407Z"}
>  \{"log":"[2020-07-27 05:14:19,032] INFO [GroupCoordinator 1001]: Group 
> logstash with generation 86 is now empty (__consumer_offsets-49) 
> (kafka.coordinator.group.GroupCoordinator)\n","stream":"stdout","time":"2020-07-27T05:14:19.032619661Z"}
>  
> {"log":"[2020-07-27 05:15:10,766] INFO [GroupCoordinator 1001]: Preparing to 
> rebalance group logstash in state PreparingRebalance with old generation 86 
> (__consumer_offsets-49) (reason: Adding new member 
> logstash-0-bde43584-a21e-4a0d-92cc-69196f213f11 with group instanceid None) 
> (kafka.coordinator.group.GroupCoordinator)\n","stream":"stdout","time":"2020-07-27T05:15:10.767053896Z"}
> ```
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10423) Logtash is restarting with invalid_fetch_session_epoch error

2020-08-25 Thread Vaibhav Nagpal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183881#comment-17183881
 ] 

Vaibhav Nagpal edited comment on KAFKA-10423 at 8/25/20, 9:16 AM:
--

I am also experience this error in confluentinc-kafka-connect-s3-5.3.0 (which 
should have Kafka 2.3.0 client version)


{code:java}
[2020-08-23 00:41:56,831] INFO [Consumer clientId=consumer-109, 
groupId=connect-ls_verify_qa] Node 3 was unable to process the fetch request 
with (sessionId=1446130732, epoch=126166): INVALID_FETCH_SESSION_EPOCH. 
(org.apache.kafka.clients.FetchSessionHandler){code}


However Kafka connect seems to be able to process things


was (Author: vaibhavrtk):
I am also experience this error in confluentinc-kafka-connect-s3-5.3.0 (which 
should have Kafka 2.3.0 client version)

> Logtash is restarting with invalid_fetch_session_epoch error
> 
>
> Key: KAFKA-10423
> URL: https://issues.apache.org/jira/browse/KAFKA-10423
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, KafkaConnect, logging
>Affects Versions: 2.3.0
>Reporter: Akshay Sharma
>Priority: Major
>
> Logstash(input plugin is kafka) is restarting again and again with error as 
> mentioned below
>  
> logs:
> ```
> {"log":"[INFO ] 2020-07-27 05:03:43.873 [Ruby-0-Thread-12: 
> /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:244]
>  FetchSessionHandler - [Consumer clientId=logstash-0, groupId=logstash] Node 
> 1001 was unable to process the fetch request with (sessionId=2115239606, 
> epoch=1128): 
> INVALID_FETCH_SESSION_EPOCH.\n","stream":"stdout","time":"2020-07-27T05:03:43.873634303Z"}
>  \{"log":"[WARN ] 2020-07-27 05:14:18.976 [SIGTERM handler] runner - SIGTERM 
> received. Shutting 
> down.\n","stream":"stdout","time":"2020-07-27T05:14:18.976808493Z"}
>  
> {"log":"[INFO ] 2020-07-27 05:14:19.030 [Ruby-0-Thread-12: 
> /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:244]
>  AbstractCoordinator - [Consumer clientId=logstash-0, groupId=logstash] 
> Sending LeaveGroup request to coordinator 
> ikafka-0.kafka-headless.default.svc.cluster.local:9092 (id: 2147482646 rack: 
> null)\n","stream":"stdout","time":"2020-07-27T05:14:19.031121275Z"}
> ```
> KAFKA LOGS
> ```
> {"log":"[2020-07-27 05:14:19,031] INFO [GroupCoordinator 1001]: Member 
> logstash-0-9a59ad3d-c6ab-4dba-9775-5974d17934d1 in group logstash has left, 
> removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)\n","stream":"stdout","time":"2020-07-27T05:14:19.032132241Z"}
>  \{"log":"[2020-07-27 05:14:19,032] INFO [GroupCoordinator 1001]: Preparing 
> to rebalance group logstash in state PreparingRebalance with old generation 
> 85 (__consumer_offsets-49) (reason: removing member 
> logstash-0-9a59ad3d-c6ab-4dba-9775-5974d17934d1 on LeaveGroup) 
> (kafka.coordinator.group.GroupCoordinator)\n","stream":"stdout","time":"2020-07-27T05:14:19.032320407Z"}
>  \{"log":"[2020-07-27 05:14:19,032] INFO [GroupCoordinator 1001]: Group 
> logstash with generation 86 is now empty (__consumer_offsets-49) 
> (kafka.coordinator.group.GroupCoordinator)\n","stream":"stdout","time":"2020-07-27T05:14:19.032619661Z"}
>  
> {"log":"[2020-07-27 05:15:10,766] INFO [GroupCoordinator 1001]: Preparing to 
> rebalance group logstash in state PreparingRebalance with old generation 86 
> (__consumer_offsets-49) (reason: Adding new member 
> logstash-0-bde43584-a21e-4a0d-92cc-69196f213f11 with group instanceid None) 
> (kafka.coordinator.group.GroupCoordinator)\n","stream":"stdout","time":"2020-07-27T05:15:10.767053896Z"}
> ```
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10423) Logtash is restarting with invalid_fetch_session_epoch error

2020-08-25 Thread Vaibhav Nagpal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183881#comment-17183881
 ] 

Vaibhav Nagpal commented on KAFKA-10423:


I am also experience this error in confluentinc-kafka-connect-s3-5.3.0 (which 
should have Kafka 2.3.0 client version)

> Logtash is restarting with invalid_fetch_session_epoch error
> 
>
> Key: KAFKA-10423
> URL: https://issues.apache.org/jira/browse/KAFKA-10423
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, KafkaConnect, logging
>Affects Versions: 2.3.0
>Reporter: Akshay Sharma
>Priority: Major
>
> Logstash(input plugin is kafka) is restarting again and again with error as 
> mentioned below
>  
> logs:
> ```
> {"log":"[INFO ] 2020-07-27 05:03:43.873 [Ruby-0-Thread-12: 
> /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:244]
>  FetchSessionHandler - [Consumer clientId=logstash-0, groupId=logstash] Node 
> 1001 was unable to process the fetch request with (sessionId=2115239606, 
> epoch=1128): 
> INVALID_FETCH_SESSION_EPOCH.\n","stream":"stdout","time":"2020-07-27T05:03:43.873634303Z"}
>  \{"log":"[WARN ] 2020-07-27 05:14:18.976 [SIGTERM handler] runner - SIGTERM 
> received. Shutting 
> down.\n","stream":"stdout","time":"2020-07-27T05:14:18.976808493Z"}
>  
> {"log":"[INFO ] 2020-07-27 05:14:19.030 [Ruby-0-Thread-12: 
> /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:244]
>  AbstractCoordinator - [Consumer clientId=logstash-0, groupId=logstash] 
> Sending LeaveGroup request to coordinator 
> ikafka-0.kafka-headless.default.svc.cluster.local:9092 (id: 2147482646 rack: 
> null)\n","stream":"stdout","time":"2020-07-27T05:14:19.031121275Z"}
> ```
> KAFKA LOGS
> ```
> {"log":"[2020-07-27 05:14:19,031] INFO [GroupCoordinator 1001]: Member 
> logstash-0-9a59ad3d-c6ab-4dba-9775-5974d17934d1 in group logstash has left, 
> removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)\n","stream":"stdout","time":"2020-07-27T05:14:19.032132241Z"}
>  \{"log":"[2020-07-27 05:14:19,032] INFO [GroupCoordinator 1001]: Preparing 
> to rebalance group logstash in state PreparingRebalance with old generation 
> 85 (__consumer_offsets-49) (reason: removing member 
> logstash-0-9a59ad3d-c6ab-4dba-9775-5974d17934d1 on LeaveGroup) 
> (kafka.coordinator.group.GroupCoordinator)\n","stream":"stdout","time":"2020-07-27T05:14:19.032320407Z"}
>  \{"log":"[2020-07-27 05:14:19,032] INFO [GroupCoordinator 1001]: Group 
> logstash with generation 86 is now empty (__consumer_offsets-49) 
> (kafka.coordinator.group.GroupCoordinator)\n","stream":"stdout","time":"2020-07-27T05:14:19.032619661Z"}
>  
> {"log":"[2020-07-27 05:15:10,766] INFO [GroupCoordinator 1001]: Preparing to 
> rebalance group logstash in state PreparingRebalance with old generation 86 
> (__consumer_offsets-49) (reason: Adding new member 
> logstash-0-bde43584-a21e-4a0d-92cc-69196f213f11 with group instanceid None) 
> (kafka.coordinator.group.GroupCoordinator)\n","stream":"stdout","time":"2020-07-27T05:15:10.767053896Z"}
> ```
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

2020-08-25 Thread GitBox


cadonna commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-679884365


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-25 Thread GitBox


chia7712 edited a comment on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-679855792


   > The second issue is that we hold a group lock while calling 
joinPurgatory.tryCompleteElseWatch. In this call, it's possible that 
DelayedJoin.onComplete() will be called. In that case, since the caller holds 
the group lock, we won't be completing partitionsToComplete in 
completeDelayedRequests().
   
   @junrao I go through group lock again and it is almost used anywhere :( 
   
   I'm worry about deadlock caused by current approach so I'd like to address 
@hachikuji refactor and your approach (separate thread). The following changes 
are included.
   
   1. ```Partition``` does not complete delayed operations. Instead, it adds 
those delayed operations to a queue. The callers ought to call the new method 
```Partition.completeDelayedActions``` to complete those delayed operations in 
proper place (to avoid conflicting locking).
   1. apple above new method to all callers who need to complete delay 
operations.
   1. ```GroupCoordinator.onCompleteJoin``` is called by ```DelayedJoin``` only 
but it always held a group lock. To resolve it, we complete delayed requests in 
a separate thread. 
   1. keep using ```tryLock```. The known conflicting locks should be resolved 
by above changes. Using ```tryLock``` can protect us from deadlock which we 
have not noticed :)
   
   @junrao WDYT?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-25 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-679855792


   > The second issue is that we hold a group lock while calling 
joinPurgatory.tryCompleteElseWatch. In this call, it's possible that 
DelayedJoin.onComplete() will be called. In that case, since the caller holds 
the group lock, we won't be completing partitionsToComplete in 
completeDelayedRequests().
   
   @junrao I go through group lock again and it is almost used anywhere :( 
   
   I'm worry about deadlock caused by current approach so I'd like to address 
@hachikuji refactor and your approach (separate thread). The following changes 
are included.
   
   1. ```Partition``` does not complete delayed operations. Instead, it adds 
those delayed operations to a queue. The callers ought to call the new method 
```Partition.completeDelayedActions``` to complete those delayed operations in 
proper place (to avoid conflicting locking).
   1. apple above new method to all callers who need to complete delay 
operations.
   1. ```GroupCoordinator.onCompleteJoin``` is called by ```DelayedJoin``` only 
but it always held a group lock. To resolve it, we complete delayed requests in 
a separate thread. 
   
   @junrao WDYT?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10362) When resuming Streams active task with EOS, the checkpoint file should be deleted

2020-08-25 Thread Ilia Pasynkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183802#comment-17183802
 ] 

Ilia Pasynkov commented on KAFKA-10362:
---

[~guozhang] Hello. I've read related TaskManager and StreamTask. Please correct 
me if I'm wrong, It seems that I have to make a call to OffsetCheckpoint's 
delete() method in StreamTask's resume() method [when task's state is SUSPENDED]

> When resuming Streams active task with EOS, the checkpoint file should be 
> deleted
> -
>
> Key: KAFKA-10362
> URL: https://issues.apache.org/jira/browse/KAFKA-10362
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie++
>
> Today when we suspend a task we commit and along with the commit we always 
> write checkpoint file even if we are eosEnabled (since the state is already 
> SUSPENDED). But the suspended task may later be resumed and in that case the 
> checkpoint file should be deleted since it should only be written when it is 
> cleanly closed.
> With our latest rebalance protocol in KIP-429, resume would not be called 
> since all suspended tasks would be closed, but with the old eager protocol it 
> may still be called — I think that may be the reason we did not get it often.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10426) Deadlock in KafkaConfigBackingStore

2020-08-25 Thread Goltseva Taisiia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183758#comment-17183758
 ] 

Goltseva Taisiia edited comment on KAFKA-10426 at 8/25/20, 6:47 AM:


Hi, [~ChrisEgerton], [~kkonstantine]  !

Could you, please, take a look at my PR?


was (Author: xakassi):
Hi, [~ChrisEgerton] !

Could you, please, take a look at my PR?

> Deadlock in KafkaConfigBackingStore
> ---
>
> Key: KAFKA-10426
> URL: https://issues.apache.org/jira/browse/KAFKA-10426
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.1, 2.6.0
>Reporter: Goltseva Taisiia
>Assignee: Goltseva Taisiia
>Priority: Critical
>  Labels: pull-request-available
>
> Hi, guys!
> We faced the following deadlock:
>  
> {code:java}
> KafkaBasedLog Work Thread - _streaming_service_config
> priority:5 - threadId:0x7f18ec22c000 - nativeId:0x950 - nativeId 
> (decimal):2384 - state:BLOCKED
> stackTrace:
> java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> com.company.streaming.platform.kafka.DistributedHerder$ConfigUpdateListener.onSessionKeyUpdate(DistributedHerder.java:1586)
> - waiting to lock <0xe6136808> (a 
> com.company.streaming.platform.kafka.DistributedHerder)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:707)
> - locked <0xd8c3be40> (a java.lang.Object)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481)
> at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264)
> at 
> org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:71)
> at 
> org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:337)
> CustomDistributedHerder-connect-1
> priority:5 - threadId:0x7f1a01e30800 - nativeId:0x93a - nativeId 
> (decimal):2362 - state:BLOCKED
> stackTrace:
> java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore.snapshot(KafkaConfigBackingStore.java:285)
> - waiting to lock <0xd8c3be40> (a java.lang.Object)
> at 
> com.company.streaming.platform.kafka.DistributedHerder.updateConfigsWithIncrementalCooperative(DistributedHerder.java:514)
> - locked <0xe6136808> (a 
> com.company.streaming.platform.kafka.DistributedHerder)
> at 
> com.company.streaming.platform.kafka.DistributedHerder.tick(DistributedHerder.java:402)
> at 
> com.company.streaming.platform.kafka.DistributedHerder.run(DistributedHerder.java:286)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748){code}
> DistributedHerder went to updateConfigsWithIncrementalCooperative() 
> synchronized method and called configBackingStore.snapshot() which take a 
> lock on internal object in KafkaConfigBackingStore class.
>  
> Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized 
> block on internal object got SESSION_KEY record and called 
> updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.
>  
> As I can see the problem is here:
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L737]
>  
> As I understand this call should be performed outside synchronized block:
> {code:java}
> if (started)
>
> updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);{code}
>  
> I'm going to make a PR.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10426) Deadlock in KafkaConfigBackingStore

2020-08-25 Thread Goltseva Taisiia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183758#comment-17183758
 ] 

Goltseva Taisiia commented on KAFKA-10426:
--

Hi, [~ChrisEgerton] !

Could you, please, take a look at my PR?

> Deadlock in KafkaConfigBackingStore
> ---
>
> Key: KAFKA-10426
> URL: https://issues.apache.org/jira/browse/KAFKA-10426
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.1, 2.6.0
>Reporter: Goltseva Taisiia
>Assignee: Goltseva Taisiia
>Priority: Critical
>  Labels: pull-request-available
>
> Hi, guys!
> We faced the following deadlock:
>  
> {code:java}
> KafkaBasedLog Work Thread - _streaming_service_config
> priority:5 - threadId:0x7f18ec22c000 - nativeId:0x950 - nativeId 
> (decimal):2384 - state:BLOCKED
> stackTrace:
> java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> com.company.streaming.platform.kafka.DistributedHerder$ConfigUpdateListener.onSessionKeyUpdate(DistributedHerder.java:1586)
> - waiting to lock <0xe6136808> (a 
> com.company.streaming.platform.kafka.DistributedHerder)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:707)
> - locked <0xd8c3be40> (a java.lang.Object)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481)
> at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264)
> at 
> org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:71)
> at 
> org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:337)
> CustomDistributedHerder-connect-1
> priority:5 - threadId:0x7f1a01e30800 - nativeId:0x93a - nativeId 
> (decimal):2362 - state:BLOCKED
> stackTrace:
> java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore.snapshot(KafkaConfigBackingStore.java:285)
> - waiting to lock <0xd8c3be40> (a java.lang.Object)
> at 
> com.company.streaming.platform.kafka.DistributedHerder.updateConfigsWithIncrementalCooperative(DistributedHerder.java:514)
> - locked <0xe6136808> (a 
> com.company.streaming.platform.kafka.DistributedHerder)
> at 
> com.company.streaming.platform.kafka.DistributedHerder.tick(DistributedHerder.java:402)
> at 
> com.company.streaming.platform.kafka.DistributedHerder.run(DistributedHerder.java:286)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748){code}
> DistributedHerder went to updateConfigsWithIncrementalCooperative() 
> synchronized method and called configBackingStore.snapshot() which take a 
> lock on internal object in KafkaConfigBackingStore class.
>  
> Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized 
> block on internal object got SESSION_KEY record and called 
> updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.
>  
> As I can see the problem is here:
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L737]
>  
> As I understand this call should be performed outside synchronized block:
> {code:java}
> if (started)
>
> updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);{code}
>  
> I'm going to make a PR.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9182: KAFKA-10403 Replace scala collection by java collection in generating…

2020-08-25 Thread GitBox


chia7712 commented on pull request #9182:
URL: https://github.com/apache/kafka/pull/9182#issuecomment-679805021


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

2020-08-25 Thread GitBox


chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-679805254


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org