[GitHub] [kafka] chia7712 commented on pull request #8659: KAFKA-9617 Replica Fetcher can mark partition as failed when max.mess…

2020-05-14 Thread GitBox


chia7712 commented on pull request #8659:
URL: https://github.com/apache/kafka/pull/8659#issuecomment-629044613


   @guozhangwang @showuon Could you take a look? thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-14 Thread GitBox


ableegoldman commented on a change in pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#discussion_r425577307



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
##
@@ -114,7 +105,7 @@ public void put(final Bytes key,
 
 void log(final Bytes key,
  final byte[] value) {
-changeLogger.logChange(key, value);
+context.logChange(name(), key, value, context.timestamp());

Review comment:
   We now just delegate to the context to figure out how/what to log





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-14 Thread GitBox


ableegoldman commented on a change in pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#discussion_r425577094



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
##
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.state.StateSerdes;
-
-/**
- * Note that the use of array-typed keys is discouraged because they result in 
incorrect caching behavior.
- * If you intend to work on byte arrays as key, for example, you may want to 
wrap them with the {@code Bytes} class,
- * i.e. use {@code StoreChangeLogger} rather than {@code 
StoreChangeLogger}.
- *
- * @param 
- * @param 
- */
-class StoreChangeLogger {
-
-private final String topic;
-private final int partition;
-private final ProcessorContext context;
-private final RecordCollector collector;
-private final Serializer keySerializer;
-private final Serializer valueSerializer;
-
-StoreChangeLogger(final String storeName,
-  final ProcessorContext context,
-  final StateSerdes serialization) {
-this(storeName, context, context.taskId().partition, serialization);
-}
-
-private StoreChangeLogger(final String storeName,
-  final ProcessorContext context,
-  final int partition,
-  final StateSerdes serialization) {
-topic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
-this.context = context;
-this.partition = partition;
-this.collector = ((RecordCollector.Supplier) 
context).recordCollector();
-keySerializer = serialization.keySerializer();
-valueSerializer = serialization.valueSerializer();
-}
-
-void logChange(final K key,
-   final V value) {
-logChange(key, value, context.timestamp());
-}
-
-void logChange(final K key,

Review comment:
   This class was the root cause of the processor context issue blocking 
the active <--> standby task conversion. I was taking pieces out of it bit by 
bit and by the end it seemed pointless to have at all





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-14 Thread GitBox


ableegoldman commented on a change in pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#discussion_r425576674



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -113,10 +113,12 @@ public void register(final StateStore store,
 }
 
 /**
+ * @throws UnsupportedOperationException if the current task type is 
standby

Review comment:
   No logical changes here, just added a check for any methods that were 
previously overridden in the standby context to throw UnsupportedOperation





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jiameixie commented on pull request #8489: KAFKA-9857:Failed to build image ducker-ak-openjdk-8 on arm

2020-05-14 Thread GitBox


jiameixie commented on pull request #8489:
URL: https://github.com/apache/kafka/pull/8489#issuecomment-629033628


   @cmccabe I updated it to openjdk:11. Is that ok?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-8311) Better consumer timeout exception handling

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-8311.

Resolution: Duplicate

Closing this ticket – KIP-572 / KAFKA-9274 does subsume it.

> Better consumer timeout exception handling 
> ---
>
> Key: KAFKA-8311
> URL: https://issues.apache.org/jira/browse/KAFKA-8311
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: WooYoung
>Priority: Major
>  Labels: newbie
>
> When stream application crashed due to underlying consumer commit timeout, we 
> have seen following gaps:
> 1. The current timeout exception doesn't provide meaningful tuning 
> instructions. We should augment the error message to let user change 
> `default.api.timeout.ms` in order to tolerate longer reaction time.
> 2. Currently we have 3 different types of consumers on KStream: 
> thread-consumer, global-consumer and restore-consumer. Although we don't plan 
> to explicitly handle this consumer timeout on stream level, we could wrap it 
> with more meaningful message either on consumer or stream level to let user 
> be aware which consumer is having trouble.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jiameixie commented on pull request #8489: KAFKA-9857:Failed to build image ducker-ak-openjdk-8 on arm

2020-05-14 Thread GitBox


jiameixie commented on pull request #8489:
URL: https://github.com/apache/kafka/pull/8489#issuecomment-629031848


   @OneCricketeer So if update to OpenJDK 11,  there is no need to set 
default_jdk according to machine architecture. Thanks for your reminding



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jiameixie commented on pull request #8489: KAFKA-9857:Failed to build image ducker-ak-openjdk-8 on arm

2020-05-14 Thread GitBox


jiameixie commented on pull request #8489:
URL: https://github.com/apache/kafka/pull/8489#issuecomment-629030715


   @OneCricketeer Openjdk:11 is multi-arch, and openjdk:8 is not multi-arch.  
It's ok to run command "docker run -it openjdk:11 bash" on both arm and x86, 
while running command "docker run -it openjdk:8 bash" will bring an error.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jiameixie commented on pull request #8489: KAFKA-9857:Failed to build image ducker-ak-openjdk-8 on arm

2020-05-14 Thread GitBox


jiameixie commented on pull request #8489:
URL: https://github.com/apache/kafka/pull/8489#issuecomment-629027425


   @OneCricketeer I tested just now. Openjdk:11 is a multi-arch.  X86 and arm 
use the same image name. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7912) In-memory key-value store does not support concurrent access

2020-05-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107921#comment-17107921
 ] 

Matthias J. Sax commented on KAFKA-7912:


Can you dig out the ticket or PR? And than close this ticket?

> In-memory key-value store does not support concurrent access 
> -
>
> Key: KAFKA-7912
> URL: https://issues.apache.org/jira/browse/KAFKA-7912
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the in-memory key-value store uses a Map to store key-value pairs 
> and fetches them by calling subMap and returning an iterator to this submap. 
> This is unsafe as the submap is just a view of the original map and there is 
> risk of concurrent access.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2020-05-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107917#comment-17107917
 ] 

Matthias J. Sax commented on KAFKA-6520:


This is a quite tricky one... You can of course try. Maybe study the previous 
PR and the discussion on it to get some context before starting to write code? 
– And of course, feel free to ask any questions.

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Priority: Major
>  Labels: newbie, user-experience
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams]
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
>  See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  This is a link to a related 
> issue.
> -
> Update: there are some discussions on the PR itself which leads me to think 
> that a more general solution should be at the ClusterConnectionStates rather 
> than at the Streams or even Consumer level. One proposal would be:
>  * Add a new metric named `failedConnection` in SelectorMetrics which is 
> recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the 
> IOException / RuntimeException which indicates the connection disconnected.
>  * And then users of Consumer / Streams can monitor on this metric, which 
> normally will only have close to zero values as we have transient 
> disconnects, if it is spiking it means the brokers are consistently being 
> unavailable indicting the state.
> [~Yohan123] WDYT?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] OneCricketeer commented on a change in pull request #8489: KAFKA-9857:Failed to build image ducker-ak-openjdk-8 on arm

2020-05-14 Thread GitBox


OneCricketeer commented on a change in pull request #8489:
URL: https://github.com/apache/kafka/pull/8489#discussion_r425549885



##
File path: tests/docker/ducker-ak
##
@@ -42,7 +42,14 @@ docker_run_memory_limit="2000m"
 default_num_nodes=14
 
 # The default OpenJDK base image.
-default_jdk="openjdk:8"

Review comment:
   So, you could've kept this 

##
File path: tests/docker/ducker-ak
##
@@ -42,7 +42,14 @@ docker_run_memory_limit="2000m"
 default_num_nodes=14
 
 # The default OpenJDK base image.
-default_jdk="openjdk:8"
+case "$(uname -m)" in
+  aarch64)
+default_jdk="arm64v8/openjdk:11"
+;;
+  *)
+default_jdk="openjdk:11"

Review comment:
   Then the wild card wouldn't be necessary 

##
File path: tests/docker/ducker-ak
##
@@ -42,7 +42,14 @@ docker_run_memory_limit="2000m"
 default_num_nodes=14
 
 # The default OpenJDK base image.
-default_jdk="openjdk:8"
+case "$(uname -m)" in
+  aarch64)
+default_jdk="arm64v8/openjdk:11"

Review comment:
   Then done `default_jdk=arm64v8/${default_jdk}`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] OneCricketeer commented on a change in pull request #8489: KAFKA-9857:Failed to build image ducker-ak-openjdk-8 on arm

2020-05-14 Thread GitBox


OneCricketeer commented on a change in pull request #8489:
URL: https://github.com/apache/kafka/pull/8489#discussion_r425550291



##
File path: tests/docker/ducker-ak
##
@@ -42,7 +42,14 @@ docker_run_memory_limit="2000m"
 default_num_nodes=14
 
 # The default OpenJDK base image.
-default_jdk="openjdk:8"

Review comment:
   But 11 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8661: KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks

2020-05-14 Thread GitBox


guozhangwang commented on a change in pull request #8661:
URL: https://github.com/apache/kafka/pull/8661#discussion_r425549980



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##
@@ -248,17 +243,6 @@ void restoreAllInternal(final Collection> records) {
 final long segmentId = segments.segmentId(timestamp);
 final S segment = segments.getOrCreateSegmentIfLive(segmentId, 
context, observedStreamTime);
 if (segment != null) {
-// This handles the case that state store is moved to a new 
client and does not
-// have the local RocksDB instance for the segment. In this 
case, toggleDBForBulkLoading
-// will only close the database and open it again with bulk 
loading enabled.
-if (!bulkLoadSegments.contains(segment)) {
-segment.toggleDbForBulkLoading(true);
-// If the store does not exist yet, the 
getOrCreateSegmentIfLive will call openDB that
-// makes the open flag for the newly created store.
-// if the store does exist already, then 
toggleDbForBulkLoading will make sure that
-// the store is already open here.
-bulkLoadSegments = new HashSet<>(segments.allSegments());
-}

Review comment:
   Actually, I'm now thinking that when we moved the `ChangelogReader` out 
of the stream thread, should we just consider removing the bulk loading logic 
for everyone.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7041) Using RocksDB bulk loading for StandbyTasks

2020-05-14 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107901#comment-17107901
 ] 

Guozhang Wang commented on KAFKA-7041:
--

I'd agree with your assessment, actually I'm thinking that, with KIP-441 and in 
the future with separate threads for restorations, do we really need to use 
bulk-loading any more for any scenarios.

> Using RocksDB bulk loading for StandbyTasks
> ---
>
> Key: KAFKA-7041
> URL: https://issues.apache.org/jira/browse/KAFKA-7041
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Nikki Thean
>Priority: Major
>
> In KAFKA-5363 we introduced RocksDB bulk loading to speed up store recovery. 
> We could do the same optimization for StandbyTasks to make them more 
> efficient and to reduce the likelihood that StandbyTasks lag behind.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jiameixie commented on pull request #8489: KAFKA-9857:Failed to build image ducker-ak-openjdk-8 on arm

2020-05-14 Thread GitBox


jiameixie commented on pull request #8489:
URL: https://github.com/apache/kafka/pull/8489#issuecomment-629007067


   @OneCricketeer Yes, you're right.  I recommitted the pr and upgraded openjdk 
8 to openjdk 11.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2020-05-14 Thread Vince Mu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107900#comment-17107900
 ] 

Vince Mu commented on KAFKA-6520:
-

Can i be assigned this ticket? I would love to give it a shot. 

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Priority: Major
>  Labels: newbie, user-experience
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams]
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
>  See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  This is a link to a related 
> issue.
> -
> Update: there are some discussions on the PR itself which leads me to think 
> that a more general solution should be at the ClusterConnectionStates rather 
> than at the Streams or even Consumer level. One proposal would be:
>  * Add a new metric named `failedConnection` in SelectorMetrics which is 
> recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the 
> IOException / RuntimeException which indicates the connection disconnected.
>  * And then users of Consumer / Streams can monitor on this metric, which 
> normally will only have close to zero values as we have transient 
> disconnects, if it is spiking it means the brokers are consistently being 
> unavailable indicting the state.
> [~Yohan123] WDYT?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-14 Thread GitBox


ableegoldman commented on a change in pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#discussion_r425544374



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.List;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+
+abstract class AbstractReadOnlyDecorator extends 
WrappedStateStore {

Review comment:
   All of this (and the other new class below) was just copied over from 
ProcessorContextImpl. I changed the named to `AbstractXXXDecorator` since it's 
an abstract class but it's all otherwise unchanged





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7041) Using RocksDB bulk loading for StandbyTasks

2020-05-14 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107885#comment-17107885
 ] 

Sophie Blee-Goldman commented on KAFKA-7041:


> we should also be careful here: just calling onRestoreStart will disable 
>autocompactions


Wow, I predicted the future! Well more like the present since [this 
bug|https://issues.apache.org/jira/browse/KAFKA-9603] has been there since long 
before I made that comment...too bad I didn't actually look any closer at what 
I was talking about...

Anyways I left some more detailed thoughts on this on [Bruno's 
PR|https://github.com/apache/kafka/pull/8661#discussion_r424771783] but the 
tl;dr is "yes", we should close this ticket imo

> Using RocksDB bulk loading for StandbyTasks
> ---
>
> Key: KAFKA-7041
> URL: https://issues.apache.org/jira/browse/KAFKA-7041
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Nikki Thean
>Priority: Major
>
> In KAFKA-5363 we introduced RocksDB bulk loading to speed up store recovery. 
> We could do the same optimization for StandbyTasks to make them more 
> efficient and to reduce the likelihood that StandbyTasks lag behind.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7912) In-memory key-value store does not support concurrent access

2020-05-14 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107884#comment-17107884
 ] 

Sophie Blee-Goldman commented on KAFKA-7912:


I think I actually did end up fixing this in a (hopefully!) 
non-performance-regressing way. Pretty sure it would be fixed for 2.3.1 and 
2.4...

> In-memory key-value store does not support concurrent access 
> -
>
> Key: KAFKA-7912
> URL: https://issues.apache.org/jira/browse/KAFKA-7912
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the in-memory key-value store uses a Map to store key-value pairs 
> and fetches them by calling subMap and returning an iterator to this submap. 
> This is unsafe as the submap is just a view of the original map and there is 
> risk of concurrent access.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7985) Cleanup AssignedTasks / AbstractTask logic

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7985.

Resolution: Abandoned

Given that this ticket is old and the code was refactored multiple times 
already, I am closing this ticket.

> Cleanup AssignedTasks / AbstractTask logic
> --
>
> Key: KAFKA-7985
> URL: https://issues.apache.org/jira/browse/KAFKA-7985
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Today the life time of a task is:
> created -> [initializeStateStores] -> 
> restoring (writes to the initialized state stores) -> [initializeTopology] -> 
> running -> [closeTopology] -> 
> suspended -> [closeStateManager] -> 
> dead
> And hence the assigned tasks contains the following non-overlapping sets : 
> created, restoring, running, suspended, (dead tasks do no need to be 
> maintained). Normally `created` should be empty since once a task is created 
> it should move on transit to either restoring or running immediately. So 
> whenever we are suspending tasks, we should go through these sets and act 
> accordingly:
> 1. `created` and `suspended`: just check these two sets are always empty.
> 2. `running`: transit to `suspended`.
> 3. `restoring`: transite to `suspended`. But the difference here is that we 
> do not need to close topology since it was not created yet at all; we just 
> need to remember the restored position, and keep the restorers on hold 
> instead of clearing all of them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7912) In-memory key-value store does not support concurrent access

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7912:
---
Fix Version/s: (was: 2.3.0)

> In-memory key-value store does not support concurrent access 
> -
>
> Key: KAFKA-7912
> URL: https://issues.apache.org/jira/browse/KAFKA-7912
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the in-memory key-value store uses a Map to store key-value pairs 
> and fetches them by calling subMap and returning an iterator to this submap. 
> This is unsafe as the submap is just a view of the original map and there is 
> risk of concurrent access.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7174) Improve version probing of subscription info

2020-05-14 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107880#comment-17107880
 ] 

Sophie Blee-Goldman commented on KAFKA-7174:


Technically w.r.t version probing we still just detect the case by comparing 
the version numbers rather than encoding the followup rebalance in the 
subscription. I considered changing this during 
[pull/8596|https://github.com/apache/kafka/pull/8596] but ultimately decided to 
leave this logic as is, since we need to process the version number anyway in 
order to detect that we need to downgrade.

I might be missing the real point of this ticket, but, I think we should close 
it as Not a Problem?

> Improve version probing of subscription info
> 
>
> Key: KAFKA-7174
> URL: https://issues.apache.org/jira/browse/KAFKA-7174
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Priority: Major
>  Labels: compatibility
>
> During code review for KAFKA-5037, [~guozhang] made the following suggestion:
> Currently the version probing works as the following:
> when leader receives the subscription info encoded with a higher version that 
> it can understand (e.g. the leader is on version 3, while one of the 
> subscription received is encode with version 4), it will send back an empty 
> assignment with the assignment encoded with version 3, and also 
> latestSupportedVersion set to 3.
> when the member receives the assignment, it checks if latestSupportedVersion 
> is smaller than the version it used for encoding the sent subscription (i.e. 
> the above logic). If it is smaller, then it means that leader cannot 
> understand, in this case, version 4. It will then set the flag and then 
> re-subscribe but with a down-graded encoding format of version 3.
> NOW with PR #5322, we can let leader to clearly communicate this error via 
> the error code, and upon receiving the assignment, if the error code is 
> VERSION_PROBING, then the member can immediately know what happens, and hence 
> can simplify the above logic. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7708) [kafka-streams-scala] Invalid signature for KTable join in 2.12

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7708.

Resolution: Abandoned

> [kafka-streams-scala] Invalid signature for KTable join in 2.12
> ---
>
> Key: KAFKA-7708
> URL: https://issues.apache.org/jira/browse/KAFKA-7708
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Edmondo Porcu
>Priority: Major
>  Labels: scala
>
> The signature in Scala 2.12 for the join in the 
> org.kafka.streams.scala.streams.KTable cannot be resolved by the compiler, 
> probably due to the way parameters lists are handled by the compiler .
> See:
>  
> [https://github.com/scala/bug/issues/11288]
> [https://stackoverflow.com/questions/53615950/scalac-2-12-fails-to-resolve-overloaded-methods-with-multiple-argument-lists-whe]
>  
> We are wondering how this is not captured by the current build of Kafka, we 
> are building on 2.12.7 as well



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7445) Branch one Stream in multiple Streams

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7445.

Resolution: Not A Problem

> Branch one Stream in multiple Streams
> -
>
> Key: KAFKA-7445
> URL: https://issues.apache.org/jira/browse/KAFKA-7445
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Dennis Reiter
>Priority: Minor
>
> Hi,
> I need to branch/split KStreams in multiple independent KStreams. I thought, 
> {{org.apache.kafka.streams.kstream.internals.KStreamImpl#branch}} is the 
> right one but in fact, its designed for another purpose.
> In contrast to {{branch}} I need to assign the record to *all* matching 
> streams, not only one stream.
> Speaking in code 
> ({{org.apache.kafka.streams.kstream.internals.KStreamBranch}}):
> {code:java}
> if (predicates[i].test(key, value)) {
>// use forward with childIndex here
>// and pipe the record to multiple streams
>context().forward(key, value, i);
> }
> {code}
> My question: is this still possible with features already included in 
> Streams? Or shall I propose a change?
> Thanks in advance
> Dennis



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wanwenli closed pull request #1779: KAFKA-4083: fix different replication factor during replica reassignment

2020-05-14 Thread GitBox


wanwenli closed pull request #1779:
URL: https://github.com/apache/kafka/pull/1779


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers

2020-05-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107876#comment-17107876
 ] 

Matthias J. Sax commented on KAFKA-7271:


[~bbejeck] This is marked as blocker for 2.6 – are you planning to work on it?

> Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers
> ---
>
> Key: KAFKA-7271
> URL: https://issues.apache.org/jira/browse/KAFKA-7271
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: John Roesler
>Assignee: Bill Bejeck
>Priority: Blocker
> Fix For: 2.6.0
>
>
> Fix in the oldest branch that ignores the test and cherry-pick forward.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on pull request #7862: KAFKA-9246:Update Heartbeat timeout when ConsumerCoordinator commit offset

2020-05-14 Thread GitBox


dengziming commented on pull request #7862:
URL: https://github.com/apache/kafka/pull/7862#issuecomment-628997216


   @hachikuji hi Jason, PTAL, thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-7203) Improve Streams StickyTaskAssingor

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7203.

Resolution: Fixed

> Improve Streams StickyTaskAssingor
> --
>
> Key: KAFKA-7203
> URL: https://issues.apache.org/jira/browse/KAFKA-7203
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> This is a inspired discussion while trying to fix KAFKA-7144.
> Currently we are not striking a very good trade-off sweet point between 
> stickiness and workload balance: we are honoring the former more than the 
> latter. One idea to improve on this is the following:
> {code}
> I'd like to propose a slightly different approach to fix 7114 while making 
> no-worse tradeoffs between stickiness and sub-topology balance. The key idea 
> is to try to adjust the assignment to gets the distribution as closer as to 
> the sub-topologies' num.tasks distribution.
> Here is a detailed workflow:
> 1. at the beginning, we first calculate for each client C, how many tasks 
> should it be assigned ideally, as num.total_tasks / num.total_capacity * 
> C_capacity rounded down, call it C_a. Note that since we round down this 
> number, the summing C_a across all C would be <= num.total_tasks, but this 
> does not matter.
> 2. and then for each client C, based on its num. previous assigned tasks C_p, 
> we calculate how many tasks it should take over, or give up as C_a - C_p (if 
> it is positive, it should take over some, otherwise it should give up some).
> Note that because of the round down, when we calculate the C_a - C_p for each 
> client, we need to make sure that the total number of give ups and total 
> number of take overs should be equal, some ad-hoc heuristics can be used.
> 3. then we calculate the tasks distribution across the sub-topologies as a 
> whole. For example, if we have three sub-topologies, st0 and st1, and st0 has 
> 4 total tasks, st1 has 4 total tasks, and st2 has 8 total tasks, then the 
> distribution between st0, st1 and st2 should be 1:1:2. Let's call it the 
> global distribution, and note that currently since num.tasks per sub-topology 
> never change, this distribution should NEVER change.
> 4. then for each client that should give up some, we decides which tasks it 
> should give up so that the remaining tasks distribution is proportional to 
> the above global distribution.
> For example, if a client previously own 4 tasks of st0, no tasks of st1, and 
> 2 tasks of st2, and now it needs to give up 3 tasks, I should then give up 2 
> of st0 and 1 of st1, so that the remaining distribution is closer to 1:1:2.
> 5. now we've collected a list of given-up tasks plus the ones that does not 
> have any prev active assignment (normally operations it should not happen 
> since all tasks should have been created since day one), we now migrate them 
> to those who needs to take over some, similarly proportional to the global 
> distribution.
> For example if a client previously own 1 task of st0, and nothing of st1 and 
> st2, and now it needs to take over 3 tasks, we would try to give it 1 task of 
> st1 and 2 tasks of st2, so that the resulted distribution becomes 1:1:2. And 
> we ONLY consider prev-standby tasks when we decide which one of st1 / st2 
> should we get for that client.
> Now, consider the following scenarios:
> a) this is a clean start and there is no prev-assignment at all, step 4 would 
> be a no-op; the result should still be fine.
> b) a client leaves the group, no client needs to give up and all clients may 
> need to take over some, so step 4 is no-op, and the cumulated step 5 only 
> contains the tasks of the left client.
> c) a new client joins the group, all clients need to give up some, and only 
> the new client need to take over all the given-up ones. Hence step 5 is 
> straight-forward.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7209.

Resolution: Abandoned

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7174) Improve version probing of subscription info

2020-05-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107869#comment-17107869
 ] 

Matthias J. Sax commented on KAFKA-7174:


[~guozhang] [~vvcephei] [~ableegoldman] Given the latest work on rebalancing, 
is this ticket still valid?

> Improve version probing of subscription info
> 
>
> Key: KAFKA-7174
> URL: https://issues.apache.org/jira/browse/KAFKA-7174
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Priority: Major
>  Labels: compatibility
>
> During code review for KAFKA-5037, [~guozhang] made the following suggestion:
> Currently the version probing works as the following:
> when leader receives the subscription info encoded with a higher version that 
> it can understand (e.g. the leader is on version 3, while one of the 
> subscription received is encode with version 4), it will send back an empty 
> assignment with the assignment encoded with version 3, and also 
> latestSupportedVersion set to 3.
> when the member receives the assignment, it checks if latestSupportedVersion 
> is smaller than the version it used for encoding the sent subscription (i.e. 
> the above logic). If it is smaller, then it means that leader cannot 
> understand, in this case, version 4. It will then set the flag and then 
> re-subscribe but with a down-graded encoding format of version 3.
> NOW with PR #5322, we can let leader to clearly communicate this error via 
> the error code, and upon receiving the assignment, if the error code is 
> VERSION_PROBING, then the member can immediately know what happens, and hence 
> can simplify the above logic. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7133) DisconnectException every 5 minutes in single restore consumer thread

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7133.

Resolution: Abandoned

> DisconnectException every 5 minutes in single restore consumer thread
> -
>
> Key: KAFKA-7133
> URL: https://issues.apache.org/jira/browse/KAFKA-7133
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
> Environment: Kafka Streams application in Kubernetes.
> Kafka Server in Docker on machine in host mode
>Reporter: Chris Schwarzfischer
>Priority: Major
>
> One of our streams applications (and only this one) gets a 
> {{org.apache.kafka.common.errors.DisconnectException}} almost exactly every 5 
> minutes.
> The application has two of
> KStream -> KGroupedStream -> KTable -> KGroupedTable -> KTable
> aggregations.
> Relevant config is in Streams:
> {code:java}
> this.properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.AT_LEAST_ONCE);
> //...
> this.properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
> this.properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 
> 1024 * 500 /* 500 MB */ );
> this.properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 
> 1024 * 100 /* 100 MB */);
> this.properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1024 * 1024 * 50 
> /* 50 MB */);
> {code}
> On the broker:
> {noformat}
> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
> KAFKA_OFFSETS_RETENTION_MINUTES: 108000
> KAFKA_MIN_INSYNC_REPLICAS: 2
> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
> KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS: 2147483000
> KAFKA_LOG_RETENTION_HOURS: 2688
> KAFKA_OFFSETS_RETENTION_CHECK_INTERVAL_MS: 120
> KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 12000
> {noformat}
> Logging gives us a single restore consumer thread that throws exceptions 
> every 5 mins:
>  
> {noformat}
> July 4th 2018, 15:38:51.560   dockertest032018-07-04T13:38:51,559Z INFO  
> : 
> [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
>  FetchSessionHandler::handleError:440 - [Consumer 
> clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
>  groupId=] Error sending fetch request (sessionId=317141939, epoch=INITIAL) 
> to node 1: org.apache.kafka.common.errors.DisconnectException.
> July 4th 2018, 15:37:54.833   dockertest032018-07-04T13:37:54,832Z INFO  
> : 
> [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
>  FetchSessionHandler::handleError:440 - [Consumer 
> clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
>  groupId=] Error sending fetch request (sessionId=2064325970, epoch=INITIAL) 
> to node 3: org.apache.kafka.common.errors.DisconnectException.
> July 4th 2018, 15:37:54.833   dockertest032018-07-04T13:37:54,832Z INFO  
> : 
> [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
>  FetchSessionHandler::handleError:440 - [Consumer 
> clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
>  groupId=] Error sending fetch request (sessionId=1735432619, epoch=INITIAL) 
> to node 2: org.apache.kafka.common.errors.DisconnectException.
> July 4th 2018, 15:32:26.379   dockertest032018-07-04T13:32:26,378Z INFO  
> : 
> [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
>  FetchSessionHandler::handleError:440 - [Consumer 
> clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
>  groupId=] Error sending fetch request (sessionId=317141939, epoch=INITIAL) 
> to node 1: org.apache.kafka.common.errors.DisconnectException.
> July 4th 2018, 15:32:01.926   dockertest032018-07-04T13:32:01,925Z INFO  
> : 
> [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
>  FetchSessionHandler::handleError:440 - [Consumer 
> clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
>  groupId=] Error sending fetch request (sessionId=1735432619, epoch=INITIAL) 
> to node 2: org.apache.kafka.common.errors.DisconnectException.
> July 4th 2018, 15:32:01.926   dockertest032018-07-04T13:32:01,925Z INFO  
> : 
> [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
>  FetchSessionHandler::handleError:440 - [Consumer 
> clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
>  groupId=] Error sending fetch request (sessionId=2064325970, epoch=INITIAL) 
> to node 

[jira] [Commented] (KAFKA-7041) Using RocksDB bulk loading for StandbyTasks

2020-05-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107868#comment-17107868
 ] 

Matthias J. Sax commented on KAFKA-7041:


Given that bulk-loading results in an increase number of open files, should we 
close this ticket?

> Using RocksDB bulk loading for StandbyTasks
> ---
>
> Key: KAFKA-7041
> URL: https://issues.apache.org/jira/browse/KAFKA-7041
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Nikki Thean
>Priority: Major
>
> In KAFKA-5363 we introduced RocksDB bulk loading to speed up store recovery. 
> We could do the same optimization for StandbyTasks to make them more 
> efficient and to reduce the likelihood that StandbyTasks lag behind.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7035) Kafka Processor's init() method sometimes is not called

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7035.

Resolution: Abandoned

> Kafka Processor's init() method sometimes is not called
> ---
>
> Key: KAFKA-7035
> URL: https://issues.apache.org/jira/browse/KAFKA-7035
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Oleksandr Konopko
>Priority: Critical
> Attachments: TransformProcessor.java
>
>
> Scenario:
> 1. We have processing of Kafka Topic which is implemented with Processor API
> 2. We want to collect metrics (lets say just count number of processed 
> entities for simplicity)
> 3. How we tried to organize this
>  * process data with process() method and send it down the stream with context
>  * on each call of process() method update the counter
>  * schedule puctuate function which will send metric to special topic. Metric 
> is build with counter
> You can find the code (we removed all business sensitive code out of it, so 
> it should be easy to read) in attachment
>  
> Problematic Kafka Streams behaviour that i can see by logging every step:
> 1. We have 80 messages in the input topic
> 2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, 
> ProcessorB, ProcessorC and ProcessorD
> 3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed 
> correctly, results are sent down the stream. Counter is upated
> 4. init() method was not called for ProcessorA and ProcessorB
> 5. ProcessorC and ProcessorD are created and they start to receive all the 
> rest of data. 95-99%
> 6. init() method is called for both ProcessorC and ProcessorD. It initiates 
> punctuation, which causes Metrics message be created and sent down the metric 
> stream periodically
> 7. ProcessorA and ProcessorB are closed. init() was never called for them. So 
> Metric entity was not sent to metrics topic
> 8. Processing is finished.
>  
> In the end:
> Expected:
>  * 80 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to 80
> Actual results:
>  * 80 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to some number 3-6% less 
> than 80, for example 786543
>  
> Problem:
>  * init() method call is not guaranteed
>  * there is no way to guarantee that all work was done by punctuate method 
> before close()
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7015) Enhance RecordCollectorImpl exceptions with more context information

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7015.

Resolution: Not A Problem

We should actually not log sensitive information like key and value and 
actually removed some log statements that may leak key/value information. 
Hence, closing this ticket.

> Enhance RecordCollectorImpl exceptions with more context information  
> -
>
> Key: KAFKA-7015
> URL: https://issues.apache.org/jira/browse/KAFKA-7015
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Priority: Minor
>
> In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and 
> only have concrete key/value types on outer layers/wrappers of the stores.
> For this reason, the most inner {{RocksDBStore}} cannot provide useful error 
> messages anymore if a put/get/delete operation fails as it only handles plain 
> bytes.
> In addition, the corresponding calls to record changelog records to record 
> collectors will also be sending byte arrays only, and hence when there is an 
> error happening, the record collector can only display the key but not the 
> value since it is all bytes:
> {code:java}
> [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl   -
> task [2_2] Error sending record (key {"eventId":XXX,"version":123}
> value [] timestamp YYY) to topic TTT
> due to ...
> {code}
> The store exceptions got fixed via KAFKA-6538.
> This Jira is to track the fix for RecordCollectorImpl.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7004) Support configurable restore consumer poll timeout

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7004.

Resolution: Fixed

> Support configurable restore consumer poll timeout
> --
>
> Key: KAFKA-7004
> URL: https://issues.apache.org/jira/browse/KAFKA-7004
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Shawn Nguyen
>Priority: Minor
>  Labels: Needs-kip
>
> In the StateChangelogReader, the restore consumer is currently hard coded to 
> poll for 10ms at most per call.
> {code:java}
>  public Collection restore(final RestoringTasks active) {
> if (!needsInitializing.isEmpty()) {
> initialize();
> }
> if (needsRestoring.isEmpty()) {
> restoreConsumer.unsubscribe();
> return completed();
> }
> final Set restoringPartitions = new 
> HashSet<>(needsRestoring.keySet());
> try {
> final ConsumerRecords allRecords = 
> restoreConsumer.poll(10);
> for (final TopicPartition partition : restoringPartitions) {
> restorePartition(allRecords, partition, 
> active.restoringTaskFor(partition));
> }
> ...{code}
>  
> It'd be nice to be able to configure the restore consumer to poll for a 
> larger timeout (e.g. 500ms) to give it more time to accumulate records for 
> the restoration task. In the main event loop for polling in 
> StreamThread.java, the main consumer uses the POLL_MS_CONFIG set in 
> StreamsConfig.java to configure the max poll timeout. We could construct a 
> similar config in the StreamsConfig class, but prefixed with the processing 
> type (restore in this case). Let me know if this sounds reasonable, and I'll 
> create a KIP and PR.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6977.

Resolution: Abandoned

>  Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 
> while fetching data
> -
>
> Key: KAFKA-6977
> URL: https://issues.apache.org/jira/browse/KAFKA-6977
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>  Labels: streams
>
> We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and 
> constantly run into the following exception: 
> {code:java}
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> partition assignment took 40 ms.
> current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, 
> 0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, 
> 0_18, 0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31]
> current standby tasks: []
> previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 
> 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28]
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> State transition from PARTITIONS_ASSIGNED to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.KafkaStreams - stream-client 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State 
> transition from REBALANCING to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> ERROR org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> Encountered the following error during processing:
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
> java.lang.IllegalStateException: Unexpected error code 2 while fetching data
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> Shutting down
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> State transition from RUNNING to PENDING_SHUTDOWN.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka 
> producer with timeoutMillis = 9223372036854775807 ms.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> Stream thread shutdown complete
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> State transition from PENDING_SHUTDOWN to DEAD.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.KafkaStreams - stream-client 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4] State 
> transition from RUNNING to ERROR.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> WARN org.apache.kafka.streams.KafkaStreams - stream-client 
> 

[jira] [Commented] (KAFKA-4748) Need a way to shutdown all workers in a Streams application at the same time

2020-05-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107865#comment-17107865
 ] 

Matthias J. Sax commented on KAFKA-4748:


If we have a mechanism to stop all threads/instances if an error occurs (via 
KAFKA-6943), we could also add a public API to trigger the same mechanism.

> Need a way to shutdown all workers in a Streams application at the same time
> 
>
> Key: KAFKA-4748
> URL: https://issues.apache.org/jira/browse/KAFKA-4748
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Elias Levy
>Priority: Major
>
> If you have a fleet of Stream workers for an application and attempt to shut 
> them down simultaneously (e.g. via SIGTERM and 
> Runtime.getRuntime().addShutdownHook() and streams.close())), a large number 
> of the workers fail to shutdown.
> The problem appears to be a race condition between the shutdown signal and 
> the consumer rebalancing that is triggered by some of the workers existing 
> before others.  Apparently, workers that receive the signal later fail to 
> exit apparently as they are caught in the rebalance.
> Terminating workers in a rolling fashion is not advisable in some situations. 
>  The rolling shutdown will result in many unnecessary rebalances and may 
> fail, as the application may have large amount of local state that a smaller 
> number of nodes may not be able to store.
> It would appear that there is a need for a protocol change to allow the 
> coordinator to signal a consumer group to shutdown without leading to 
> rebalancing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6822) Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6822.

Resolution: Abandoned

> Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0
> ---
>
> Key: KAFKA-6822
> URL: https://issues.apache.org/jira/browse/KAFKA-6822
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, streams
>Reporter: Phil Mikhailov
>Priority: Major
>
> We have a microservices that use Kafka Streams which stuck in initialization 
> of stream topolgy while filling StateStore from Kafka using KafkaConsumer. 
> Microservice is build with Kafka Streams 0.10.2.1-cp1 (Confluent 3.2.1) but 
> environment runs Kafka cluster 1.0.0 (Confluent 4.0.0). 
> We reproduced this problem several times by restarting microservices and 
> eventually had to reset the stream offsets to beginning in order unblock 
> microservices.
> While investigating this problem more deeply we found out that  StateStore 
> (0.10.2.1) stuck in loading data using {{ProcessorStateManager}}. It uses 
> KafkaConsumer (0.10.2.1) to fill the store and it calculates offsets like 
> this:
>  Fetcher:524
> {code:java}
> long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
> {code}
> Get the latest offset from records (which were got from {{poll}}) plus 1.
>  So the next offset is estimated.
> In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
> {code:java}
> long nextOffset = partitionRecords.nextFetchOffset;
> {code}
> It returns the actual next offset but not estimated.
> That said, we had a situation when StateStore (0.10.2.1) stuck in loading 
> data. The reason was in {{ProcessorStateManager.restoreActiveState:245}} 
> which kept spinning in consumer loop 'cause the following condition never 
> happened:
> {code:java}
>    } else if (restoreConsumer.position(storePartition) == endOffset) {
>    break;
>    }
> {code}
>  
> We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
> compaction. 
>  Or there is inconsistency between offsets calculation between 0.10.2.1 and 
> 1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6888) java.lang.UnsatisfiedLinkError: librocksdbjni4271925274440341234.dll: âÑX

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6888.

Resolution: Abandoned

> java.lang.UnsatisfiedLinkError: librocksdbjni4271925274440341234.dll:  âÑX
> --
>
> Key: KAFKA-6888
> URL: https://issues.apache.org/jira/browse/KAFKA-6888
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 1.1.0
> Environment: Windows 7, Java 8, Kafka-Streams 1.1.0 and 0.11.0.0 
> versions
>Reporter: Sandeep Kapoor
>Priority: Major
>
> Hi Team,
> I am Window 7 user and using *kafka-streams* 1.1.0 (Latest) version. When I 
> am executing my code, I am getting the issue "StreamThread-1" 
> **java.lang.UnsatisfiedLinkError:** 
> *C:\Users\sandeep\AppData\Local\Temp\librocksdbjni4271925274440341234.dll:  
> âÑX*". Earlier I was trying my code with 0.11.0.0 version, and was getting 
> "*RocksDB on windows (librocksdbjni-win64.dll)- Can't find dependent 
> libraries*". Then as per ([https://github.com/facebook/rocksdb/issues/1302)] 
> thread I downloaded and installed **Visual C++ runtime for Visual Studio 
> 2015** and upgraded  the kafka-stream version from 0.11.0.0 to 1.1.0. But 
> still I am getting the same exception (java.lang.UnsatisfiedLinkError). Only 
> thing changed is that instead of "Can't find dependent libraries" now I am 
> getting  "âÑX" ( a strange expression).
> Also I checked that in my maven dependencies I am using RocksDbJni version 
> 5.7.3 and it contains "librocksdbjni-win64.dll" file as well.
> *Please find below the exception, I am getting (with kafka-streams version 
> 1.1.0) :*  -
> Exception in thread 
> "streams-starter-app-862326ca-30c0-468d-898c-e40d4578f1c7-StreamThread-1" 
> java.lang.UnsatisfiedLinkError: 
> C:\Users\sandeep\AppData\Local\Temp\librocksdbjni4271925274440341234.dll:  âÑX
>      at java.lang.ClassLoader$NativeLibrary.load(Native Method)
>      at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1929)
>      at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1814)
>      at java.lang.Runtime.load0(Runtime.java:809)
>      at java.lang.System.load(System.java:1083)
>      at 
> org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
>      at 
> org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
>      at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
>      at org.rocksdb.RocksDB.(RocksDB.java:35)
>      at org.rocksdb.Options.(Options.java:25)
>      at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:116)
>      at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:167)
>      at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
>      at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
>      at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.init(InnerMeteredKeyValueStore.java:160)
>      at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.init(MeteredKeyValueBytesStore.java:102)
>      at 
> org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:225)
>      at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:162)
>      at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:88)
>      at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:316)
>      at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789)
>      at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
>      at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
>  INFO stream-client 
> [streams-starter-app-862326ca-30c0-468d-898c-e40d4578f1c7] State transition 
> from ERROR to PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams:261)
> *Please find below the exception, (with kafka-streams version 0.11.0.0 and 
> prior to installing "Visual C++ runtime for Visual Studio 2015") : -*
> WARN stream-thread 
> [streams-starter-app-4be266bc-fcc9-4c1e-93be-807e3736d6cb-StreamThread-1] 
> Unexpected state transition from ASSIGNING_PARTITIONS to DEAD. 
> (org.apache.kafka.streams.processor.internals.StreamThread:978) 
>  INFO stream-client 
> [streams-starter-app-4be266bc-fcc9-4c1e-93be-807e3736d6cb] State transition 
> from REBALANCING to PENDING_SHUTDOWN. 
> (org.apache.kafka.streams.KafkaStreams:229) 
>  java.lang.UnsatisfiedLinkError: 
> 

[jira] [Resolved] (KAFKA-6821) The producer attempted to use a producer id which is not currently assigned to its transactional id

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6821.

Resolution: Cannot Reproduce

A lot of work on EOS was done recently. Closing this ticket at it should not be 
an issue any longer.

> The producer attempted to use a producer id which is not currently assigned 
> to its transactional id 
> 
>
> Key: KAFKA-6821
> URL: https://issues.apache.org/jira/browse/KAFKA-6821
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Centos 7
>Reporter: RandySun
>Priority: Trivial
>  Labels: eos
>
> I use Kafka Stream to join to KStream,however, I found an error stack trace 
> in my application log as below:
> {code:java}
> Aborting producer batches due to fatal error 
> org.apache.kafka.common.KafkaException: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id 
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1037)
>  
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:905)
>  
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>  
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) 
> at java.lang.Thread.run(Thread.java:745) 
> Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
> producer attempted to use a producer id which is not currently assigned to 
> its transactional id 
> {code}
>  
> There is no evidence showing any other error in my application, and the kafka 
> controller.log saying
>  
>  
> {code:java}
> [2018-04-17 17:54:33,212] DEBUG [Controller id=2] Preferred replicas by 
> broker Map(2 -> Map(__consumer_offsets-19 -> Vector(2, 3, 1), 
> com.randy.Demo1-KSTREAM-FLATMAP-01-repartition-1 -> Vector(2)
> [2018-04-17 17:54:33,212] DEBUG [Controller id=2] Topics not in preferred 
> replica Map() (kafka.controller.KafkaController)
> [2018-04-17 17:54:33,212] TRACE [Controller id=2] Leader imbalance ratio for 
> broker 2 is 0.0 (kafka.controller.KafkaController)
> [2018-04-17 17:54:33,212] DEBUG [Controller id=2] Topics not in preferred 
> replica Map() (kafka.controller.KafkaController)
> [2018-04-17 17:54:33,212] TRACE [Controller id=2] Leader imbalance ratio for 
> broker 1 is 0.0 (kafka.controller.KafkaController)
> [2018-04-17 17:54:33,212] DEBUG [Controller id=2] Topics not in preferred 
> replica Map() (kafka.controller.KafkaController)
> [2018-04-17 17:54:33,212] TRACE [Controller id=2] Leader imbalance ratio for 
> broker 3 is 0.0 (kafka.controller.KafkaController)
> {code}
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5578) Streams Task Assignor should consider the staleness of state directories when allocating tasks

2020-05-14 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107862#comment-17107862
 ] 

Sophie Blee-Goldman commented on KAFKA-5578:


Yup, this pretty much sums up what we've implemented in KIP-441

> Streams Task Assignor should consider the staleness of state directories when 
> allocating tasks
> --
>
> Key: KAFKA-5578
> URL: https://issues.apache.org/jira/browse/KAFKA-5578
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Damian Guy
>Priority: Major
>
> During task assignment we use the presence of a state directory to assign 
> precedence to which instances should be assigned the task. We first chose 
> previous active tasks, but then fall back to the existence of a state dir. 
> Unfortunately we don't take into account the recency of the data from the 
> available state dirs. So in the case where a task has run on many instances, 
> it may be that we chose an instance that has relatively old data.
> When doing task assignment we should take into consideration the age of the 
> data in the state dirs. We could use the data from the checkpoint files to 
> determine which instance is most up-to-date and attempt to assign accordingly 
> (obviously making sure that tasks are still balanced across available 
> instances)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6804) Event values should not be included in log messages

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6804.

Resolution: Fixed

I believe this is fixed via KAFKA-8501.

> Event values should not be included in log messages
> ---
>
> Key: KAFKA-6804
> URL: https://issues.apache.org/jira/browse/KAFKA-6804
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Jussi Lyytinen
>Priority: Major
>
> In certain error situations, event values are included in log messages:
> {code:java}
> 2018-04-19 08:00:28 
> [my-component-37670563-39be-4fcb-92e1-02f115aca43c-StreamThread-2] ERROR 
> o.a.k.s.p.i.AssignedTasks - stream-thread 
> [my-component-37670563-39be-4fcb-92e1-02f115aca43c-StreamThread-2] Failed to 
> commit stream task 1_1 due to the following error:
> org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort sending 
> since an error caught with a previous record (key [my-key] value [my-value] 
> ...
> {code}
> In some environments, this is highly undesired behavior since the values can 
> contain sensitive information. Error logs are usually collected to separate 
> systems not meant for storing such information (e.g. patient or financial 
> data).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6461) TableTableJoinIntegrationTest is unstable if caching is enabled

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6461.

Resolution: Cannot Reproduce

> TableTableJoinIntegrationTest is unstable if caching is enabled
> ---
>
> Key: KAFKA-6461
> URL: https://issues.apache.org/jira/browse/KAFKA-6461
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: flaky-test
> Fix For: 1.1.0
>
>
> {noformat}
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
> testLeftInner[caching enabled = true] FAILED
> 20:41:05 java.lang.AssertionError: Condition not met within timeout 
> 15000. Never received expected final result.
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
> 20:41:05 at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:248)
> 20:41:05 at 
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftInner(TableTableJoinIntegrationTest.java:313){noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6237) stream stopped working after exception: Cannot execute transactional method because we are in an error state

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6237.

Resolution: Cannot Reproduce

There was a lot of EOS work recently. Closing this ticket. Should not be an 
issue any longer.

> stream stopped working after exception: Cannot execute transactional method 
> because we are in an error state
> 
>
> Key: KAFKA-6237
> URL: https://issues.apache.org/jira/browse/KAFKA-6237
> Project: Kafka
>  Issue Type: Bug
>  Components: core, streams
>Reporter: DHRUV BANSAL
>Priority: Critical
>  Labels: exactly-once
> Attachments: nohup.out
>
>
> 017-11-19 07:52:44,673 
> [project_logs_stream-a30ea242-3c9f-46a9-a01c-51903bd40ca5-StreamThread-1] 
> ERROR: org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [orion_logs_stream-a30ea242-3c9f-46a9-a01c-51903bd40ca5-StreamThread-1] 
> Failed while closing StreamTask 0_1:
> org.apache.kafka.common.KafkaException: Cannot execute transactional method 
> because we are in an error state
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:524)
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:198)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:598)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:434)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:1086)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:1041)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:538)
> Caused by: org.apache.kafka.common.KafkaException: Unexpected error in 
> AddOffsetsToTxnResponse: The server experienced an unexpected error when 
> processing the request
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager$AddOffsetsToTxnHandler.handleResponse(TransactionManager.java:978)
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:648)
>   at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
>   at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:454)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:446)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:206)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>   at java.lang.Thread.run(Thread.java:745)
> Also when I see the state of the corresponding consumer group it is saying:
> +Warning: Consumer group  is rebalancing.+



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6182) Automatic co-partitioning of topics via automatic intermediate topic with matching partitions

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6182.

Resolution: Fixed

KIP-221 is implemented now. Closing this ticket.

> Automatic co-partitioning of topics via automatic intermediate topic with 
> matching partitions
> -
>
> Key: KAFKA-6182
> URL: https://issues.apache.org/jira/browse/KAFKA-6182
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Antony Stubbs
>Priority: Major
>
> Currently it is up to the user to ensure that two input topics for a join 
> have the same number of partitions, and if they don't, manually create an 
> intermediate topic, and send the stream #through that topic first, and then 
> performing the join.
> It would be great to have Kafka streams detect this and at least give the 
> user the option to create an intermediate topic automatically with the same 
> number of partitions as the topic being joined with.
> See 
> https://docs.confluent.io/current/streams/developer-guide.html#joins-require-co-partitioning-of-the-input-data



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6125) Avoid third party exception to flow through streams code base

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6125.

Resolution: Not A Problem

> Avoid third party exception to flow through streams code base
> -
>
> Key: KAFKA-6125
> URL: https://issues.apache.org/jira/browse/KAFKA-6125
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: streams-exception-handling
>
> Streams uses multiple internal client that might throw fatal exceptions (some 
> should actually never occur, and if, this would indicate a bug).
> We should wrap all calls to the used clients with a {{try-catch}}, and log 
> those exceptions as ERRORs immediately. For exceptions that can only occur 
> due to a bug (e.g., IllegalStateException, IllegalArgumentException, 
> WakeupException, InterruptException) we should ask users in the log message 
> to report this as a bug.
> Last, we rethrow all those exceptions as {{StreamsException}} (to avoid that 
> a standard library exception might be caught by accident somewhere else in 
> our code base).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6124) Revisit default config for internal client with regard to resilience

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6124.

Resolution: Abandoned

We changed couple of default values (also in consumer/producer/admin clients). 
This ticket is not needed any longer.

> Revisit default config for internal client with regard to resilience
> 
>
> Key: KAFKA-6124
> URL: https://issues.apache.org/jira/browse/KAFKA-6124
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip, streams-resilience
>
> We should reevaluate the default config of our internally used clients, to 
> update them to make Streams more resilient out-of-the-box.
> For example:
>  - increase producer "retries"
>  - increase producer "max.block.ms"
>  - consider impact on max.poll.internal.ms (should we keep it at 
> Integer.MAX_VALUE -- note, that KAFKA-5152 resolve the issue why we did set 
> it to infinity)
>  - double check all other defaults including {{KafkaAdmintClient}}
> We should also document all finding in the docs and explain how users can 
> configure their application to be more resilient if they want to.
> This Jira requires a KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6039) Improve TaskAssignor to be more load balanced

2020-05-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107852#comment-17107852
 ] 

Matthias J. Sax commented on KAFKA-6039:


[~ableegoldman] [~vvcephei] [~guozhang] I think we can close this? Should we 
covered by KIP-441?

> Improve TaskAssignor to be more load balanced
> -
>
> Key: KAFKA-6039
> URL: https://issues.apache.org/jira/browse/KAFKA-6039
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: optimization, user-experience
>
> Today our task placement may still generate sub-optimal assignment regarding 
> load balance. One reason is that it does not account for sub-topologies. For 
> example say you have an aggregation following from a repartition topic, then 
> you will end up with two sub-topologies where the first one is very light and 
> the second one is computational heavy with state stores, however when we 
> consider their tasks we treat them equally so in the worst case one client 
> can get X number of tasks from first sub-topology and be very idle while the 
> other getting X number of tasks from the second sub-topology and busy to 
> death.
> One strawman approach to make this better is try to achieve balance across 
> sub-topologies: i.e. each client trying to get similar amount of tasks within 
> a sub-topology. However there are some more considerations to include (as 
> mentioned in the sub-taks).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6000) streams 0.10.2.1 - kafka 0.11.0.1 state restore not working

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6000.

Resolution: Abandoned

> streams 0.10.2.1 - kafka 0.11.0.1 state restore not working
> ---
>
> Key: KAFKA-6000
> URL: https://issues.apache.org/jira/browse/KAFKA-6000
> Project: Kafka
>  Issue Type: Bug
>  Components: core, streams
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Bart Vercammen
>Priority: Blocker
> Attachments: correct-restore.log, failed-restore.log
>
>
> Potential interop issue between Kafka Streams (0.10.2.1) and Kafka (0.11.0.1)
> {noformat}
> 11:24:16.416 [StreamThread-3] DEBUG rocessorStateManager - task [0_2] 
> Registering state store lateststate to its state manager 
> 11:24:16.472 [StreamThread-3] TRACE rocessorStateManager - task [0_2] 
> Restoring state store lateststate from changelog topic 
> scratch.lateststate.dsh 
> 11:24:16.472 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Resetting offset 
> for partition scratch.lateststate.dsh-2 to latest offset. 
> 11:24:16.472 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Partition 
> scratch.lateststate.dsh-2 is unknown for fetching offset, wait for metadata 
> refresh 
> 11:24:16.474 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Sending 
> ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, 
> partitionTimestamps={scratch.lateststate.dsh-2=-1}, minVersion=0) to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.476 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 
> ListOffsetResponse 
> {responses=[{topic=scratch.lateststate.dsh,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=1773763}]}]}
>  from broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.476 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Handling 
> ListOffsetResponse response for scratch.lateststate.dsh-2. Fetched offset 
> 1773763, timestamp -1 
> 11:24:16.477 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Resetting offset 
> for partition scratch.lateststate.dsh-2 to earliest offset. 
> 11:24:16.478 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Sending 
> ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, 
> partitionTimestamps={scratch.lateststate.dsh-2=-2}, minVersion=0) to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.480 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 
> ListOffsetResponse 
> {responses=[{topic=scratch.lateststate.dsh,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=0}]}]}
>  from broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.481 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Handling 
> ListOffsetResponse response for scratch.lateststate.dsh-2. Fetched offset 0, 
> timestamp -1 
> 11:24:16.483 [StreamThread-3] DEBUG rocessorStateManager - restoring 
> partition scratch.lateststate.dsh-2 from offset 0 to endOffset 1773763 
> 11:24:16.484 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch 
> request for partition scratch.lateststate.dsh-2 at offset 0 to node 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.485 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
> partitions [scratch.lateststate.dsh-2] to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.486 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
> partition scratch.lateststate.dsh-2 because there is an in-flight request to 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.490 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
> record for partition scratch.lateststate.dsh-2 with offset 0 to buffered 
> record list 
> 11:24:16.492 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 3 records 
> in fetch response for partition scratch.lateststate.dsh-2 with offset 0 
> 11:24:16.493 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Returning fetched 
> records at offset 0 for assigned partition scratch.lateststate.dsh-2 and 
> update position to 1586527 
> 11:24:16.494 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Ignoring fetched 
> records for scratch.lateststate.dsh-2 at offset 0 since the current position 
> is 1586527 
> 11:24:16.496 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch 
> request for partition scratch.lateststate.dsh-2 at offset 1586527 to node 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.496 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
> partitions [scratch.lateststate.dsh-2] to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.498 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
> partition 

[jira] [Resolved] (KAFKA-5625) Invalid subscription data may cause streams app to throw BufferUnderflowException

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5625.

Resolution: Abandoned

> Invalid subscription data may cause streams app to throw 
> BufferUnderflowException
> -
>
> Key: KAFKA-5625
> URL: https://issues.apache.org/jira/browse/KAFKA-5625
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Xavier Léauté
>Priority: Minor
>
> I was able to cause my streams app to crash with the following error when 
> attempting to join the same consumer group with a rogue client.
> At the very least I would expect streams to throw a 
> {{TaskAssignmentException}} to indicate invalid subscription data.
> {code}
> java.nio.BufferUnderflowException
> at java.nio.Buffer.nextGetIndex(Buffer.java:506)
> at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:361)
> at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:97)
> at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:302)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:365)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:512)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:462)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:445)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:798)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:778)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:358)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:574)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:545)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-5581) Avoid creating changelog topics for state stores that are materialized from a source topic

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5581.

Resolution: Fixed

> Avoid creating changelog topics for state stores that are materialized from a 
> source topic
> --
>
> Key: KAFKA-5581
> URL: https://issues.apache.org/jira/browse/KAFKA-5581
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>Priority: Major
>  Labels: architecture, performance
>
> Today Streams make all state stores to be backed by a changelog topic by 
> default unless users overrides it by {{disableLogging}} when creating the 
> state store / materializing the KTable. However there are a few cases where a 
> separate changelog topic would not be required as we can re-use an existing 
> topic for that. A few examples:
> There are a few places where the materialized store do not need a separate 
> changelog topic. This issue summarize a specific issue:
> 1) If a KTable is read directly from a source topic, and is materialized i.e. 
> {code}
> table1 = builder.table("topic1", "store1")`.
> {code}
> In this case {{table1}}'s changelog topic can just be {{topic1}}, and we do 
> not need to create a separate {{table1-changelog}} topic.
> 2) if a KStream is materialized for joins where the streams are directly from 
> a topic, e.g.:
> {code}
> stream1 = builder.stream("topic1");
> stream2 = builder.stream("topic2");
> stream3 = stream1.join(stream2, windows);  // stream1 and stream2 are 
> materialized with a changelog topic
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5578) Streams Task Assignor should consider the staleness of state directories when allocating tasks

2020-05-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107848#comment-17107848
 ] 

Matthias J. Sax commented on KAFKA-5578:


[~ableegoldman] [~cadonna] [~vvcephei] Does KIP-441 cover this already?

> Streams Task Assignor should consider the staleness of state directories when 
> allocating tasks
> --
>
> Key: KAFKA-5578
> URL: https://issues.apache.org/jira/browse/KAFKA-5578
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Damian Guy
>Priority: Major
>
> During task assignment we use the presence of a state directory to assign 
> precedence to which instances should be assigned the task. We first chose 
> previous active tasks, but then fall back to the existence of a state dir. 
> Unfortunately we don't take into account the recency of the data from the 
> available state dirs. So in the case where a task has run on many instances, 
> it may be that we chose an instance that has relatively old data.
> When doing task assignment we should take into consideration the age of the 
> data in the state dirs. We could use the data from the checkpoint files to 
> determine which instance is most up-to-date and attempt to assign accordingly 
> (obviously making sure that tasks are still balanced across available 
> instances)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-5393) in-memory state store memory issue

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5393.

Resolution: Abandoned

> in-memory state store memory issue 
> ---
>
> Key: KAFKA-5393
> URL: https://issues.apache.org/jira/browse/KAFKA-5393
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Kevin Chen
>Priority: Major
> Attachments: Screen Shot 2017-06-06 at 9.45.42 AM.png
>
>
> We are running 2 kafka stream instance that use low level Processor API, and 
> we are using in-memory state store.
> When we upgrade  instance A, which will move all the tasks to instance B, and 
> re-balance between the 2 after A is back up and running. 
> But the problem is that, even after the re-balance, the memory in instance A 
> did not drop to the previous level(the load is about the same).
> see the screen shot below.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-5240) Make persistent checkpointedOffsets optionable

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5240.

Resolution: Fixed

> Make persistent checkpointedOffsets optionable
> --
>
> Key: KAFKA-5240
> URL: https://issues.apache.org/jira/browse/KAFKA-5240
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Lukas Gemela
>Priority: Minor
>
> By looking at the ProcessorStateManager class kafka streams library tries to 
> persist current offset for each partition to the file. It has to use locking 
> mechanism to be sure that there is no other thread/task running which is 
> modifying the .checkpoint file. It does that even if you don't use persistent 
> store in your topology (which is a bit confusing)
> From my understanding this is because you want to make active state 
> restorations faster and not to seek from the beginning 
> (ProcessorStateManager:217)
> We actually run everything in docker environment and we don't restart our 
> microservices - we just run another docker container and delete the old one. 
> We don't use persistent stores and we don't want to have our microservices to 
> write anything to the filesystem. 
> We always set aggressive [compact. delete] policy to get the kafka streams 
> internal topics to have them compacted as much as possible and therefore we 
> don't need a fast recovery either - we always have to replay whole topic no 
> matter what. 
> Would it be possible to make writing to the file system optionable?
> Thanks!
> L.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425521117



##
File path: docs/security.html
##
@@ -767,7 +767,7 @@ 7.3 
Authentication using SASL
 > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file 
zk_tls_config.properties --describe --entity-type users --entity-name alice
 
-Credentials may be deleted for one or more SCRAM mechanisms using 
the --delete option:
+Credentials may be deleted for one or more SCRAM mechanisms using 
the --alter --delete-config option:
 
 > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file 
zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type 
users --entity-name alice

Review comment:
   There's no `--delete` option. And because in the previous one sample, we 
explained the `--describe` option usage, we should also put `--alter` here 
explicitly for user, not just `--delete-config`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-4996) Fix findbugs multithreaded correctness warnings for streams

2020-05-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107845#comment-17107845
 ] 

Matthias J. Sax commented on KAFKA-4996:


Is this still an issue?

> Fix findbugs multithreaded correctness warnings for streams
> ---
>
> Key: KAFKA-4996
> URL: https://issues.apache.org/jira/browse/KAFKA-4996
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Colin McCabe
>Priority: Major
>  Labels: newbie
>
> Fix findbugs multithreaded correctness warnings for streams
> {code}
> Multithreaded correctness Warnings
>   
>   
> 
>   
>   
>   
> 
>Code Warning   
>   
>   
> 
>AT   Sequence of calls to java.util.concurrent.ConcurrentHashMap may not 
> be atomic in 
> org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(long, 
> ProcessorContext) 
> 
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.KafkaStreams.stateListener; locked 66% of time   
>   
>   
> 
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.processor.internals.StreamThread.stateListener; 
> locked 66% of time
>   
>  
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.processor.TopologyBuilder.applicationId; locked 50% 
> of time   
>   
>  
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.context; locked 
> 66% of time   
>   
> 
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.CachingWindowStore.cache; locked 60% 
> of time   
>   
> 
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.CachingWindowStore.context; locked 
> 66% of time   
>   
>   
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.CachingWindowStore.name; locked 60% 
> of time   
>   
>  
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.CachingWindowStore.serdes; locked 
> 70% of time   
>   
>
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.RocksDBStore.db; locked 63% of time  
>   
>   
> 
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.RocksDBStore.serdes; locked 76% of 
> time  
>   
>   
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-5071) ERROR StreamThread:783 StreamThread-128 - stream-thread [StreamThread-128] Failed to commit StreamTask 0_304 state: org.apache.kafka.streams.errors.ProcessorStateExcept

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5071.

Resolution: Abandoned

> ERROR StreamThread:783 StreamThread-128 - stream-thread [StreamThread-128] 
> Failed to commit StreamTask 0_304 state:  
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_304] Failed 
> to flush state store fmdbt 
> -
>
> Key: KAFKA-5071
> URL: https://issues.apache.org/jira/browse/KAFKA-5071
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
> Environment: Linux
>Reporter: Dhana
>Priority: Major
> Attachments: RocksDB_Issue_commitFailedonFlush.7z
>
>
> Scenario: we use two consumer(applicaion -puse10) in different machine.
> using 400 partitions, 200 streams/consumer.
> config:
> bootstrap.servers=10.16.34.29:9092,10.16.35.134:9092,10.16.38.27:9092
> zookeeper.connect=10.16.34.29:2181,10.16.35.134:2181,10.16.38.27:2181
> num.stream.threads=200
> pulse.per.pdid.count.enable=false
> replication.factor=2
> state.dir=/opt/rocksdb
> max.poll.records=50
> session.timeout.ms=18
> request.timeout.ms=502
> max.poll.interval.ms=500
> fetch.max.bytes=102400
> max.partition.fetch.bytes=102400
> heartbeat.interval.ms = 6
> Logs - attached.
> Error:
> 2017-04-11 18:18:45.170 INFO  VehicleEventsStreamProcessor:219 
> StreamThread-32 - Current size of Treemap is 4 for pdid 
> skga11041730gedvcl2pdid2236
> 2017-04-11 18:18:45.170 INFO  VehicleEventsStreamProcessor:245 
> StreamThread-32 - GE to be processed pdid skga11041730gedvcl2pdid2236 and 
> uploadTimeStamp 2017-04-11 17:46:06.883
> 2017-04-11 18:18:45.175 INFO  VehicleEventsStreamProcessor:179 
> StreamThread-47 - Arrived GE uploadTimestamp 2017-04-11 17:46:10.911 pdid 
> skga11041730gedvcl2pdid2290
> 2017-04-11 18:18:45.176 INFO  VehicleEventsStreamProcessor:219 
> StreamThread-47 - Current size of Treemap is 4 for pdid 
> skga11041730gedvcl2pdid2290
> 2017-04-11 18:18:45.176 INFO  VehicleEventsStreamProcessor:245 
> StreamThread-47 - GE to be processed pdid skga11041730gedvcl2pdid2290 and 
> uploadTimeStamp 2017-04-11 17:46:06.911
> 2017-04-11 18:18:45.571 INFO  StreamThread:737 StreamThread-128 - 
> stream-thread [StreamThread-128] Committing all tasks because the commit 
> interval 3ms has elapsed
> 2017-04-11 18:18:45.571 INFO  StreamThread:775 StreamThread-128 - 
> stream-thread [StreamThread-128] Committing task StreamTask 0_304
> 2017-04-11 18:18:45.574 ERROR StreamThread:783 StreamThread-128 - 
> stream-thread [StreamThread-128] Failed to commit StreamTask 0_304 state: 
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_304] Failed 
> to flush state store fmdbt
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:325)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:777)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:764)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:739)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:661)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error 
> while executing flush from store fmdbt
>   at 
> com.harman.analytics.stream.base.stores.HarmanRocksDBStore.flushInternal(HarmanRocksDBStore.java:353)
>   at 
> com.harman.analytics.stream.base.stores.HarmanRocksDBStore.flush(HarmanRocksDBStore.java:342)
>   at 
> com.harman.analytics.stream.base.stores.HarmanPersistentKVStore.flush(HarmanPersistentKVStore.java:72)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:323)
>   ... 8 more
> Caused by: org.rocksdb.RocksDBException: N
>   at org.rocksdb.RocksDB.flush(Native Method)
>   at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
>   at 
> com.harman.analytics.stream.base.stores.HarmanRocksDBStore.flushInternal(HarmanRocksDBStore.java:351)
>   ... 11 more
> 2017-04-11 18:18:45.583 INFO  StreamThread:397 

[GitHub] [kafka] showuon edited a comment on pull request #8622: MINOR: Update stream documentation

2020-05-14 Thread GitBox


showuon edited a comment on pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#issuecomment-628563253


   Hi  @bbejeck , I appended more fixs for the streams documents while I'm 
reading it. Please review it when available. Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-4934) Add streams test with RocksDb failures

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4934.

Resolution: Abandoned

> Add streams test with RocksDb failures
> --
>
> Key: KAFKA-4934
> URL: https://issues.apache.org/jira/browse/KAFKA-4934
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Priority: Major
>
> We need to add either integration of system tests with RocksDb failing 
> underneath and fix any problems that occur, including deadlocks, wrong 
> exceptions, etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-4887) Enabling caching on a persistent window store breaks support for duplicate insertion

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4887.

Resolution: Fixed

> Enabling caching on a persistent window store breaks support for duplicate 
> insertion
> 
>
> Key: KAFKA-4887
> URL: https://issues.apache.org/jira/browse/KAFKA-4887
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Elias Levy
>Priority: Major
>
> {{CachingWindowStore}} and {{RocksDBWindowStore}} interact badly when 
> duplicate insertion support is enabled by passing {{true}} as the fourth 
> argument to {{windowed}} in the state store supplier.
> When the feature is enabled, {{RocksDBWindowStore}} correct handles 
> duplicates by assigning a unique sequence number to each element on insertion 
> and using the number within the key.
> When caching is enabled by calling {{enableCaching}} on the supplier, 
> {{CachingWindowStore}} fails to the the same.  Thus, of multiple values 
> inserted with the same key, only the last one survives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon removed a comment on pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon removed a comment on pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#issuecomment-628403219


   Hi @kkonstantine , could you please review this small PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#issuecomment-628981965


   hi @ijuma @kkonstantine @mjsax , I've put more fix in the documentation 
while I'm reading it. Please help review it when available. Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-4778) OOM on kafka-streams instances with high numbers of unreaped Record classes

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4778.

Resolution: Abandoned

Does not seem to be an issue in newer released any more?

> OOM on kafka-streams instances with high numbers of unreaped Record classes
> ---
>
> Key: KAFKA-4778
> URL: https://issues.apache.org/jira/browse/KAFKA-4778
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, streams
>Affects Versions: 0.10.1.1
> Environment: AWS m3.large Ubuntu 16.04.1 LTS.  rocksDB on local SSD.  
> Kafka has 3 zk, 5 brokers.  
> stream processors are run with:
> -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails
> Java(TM) SE Runtime Environment (build 1.8.0_101-b13)
> Java HotSpot(TM) 64-Bit Server VM (build 25.101-b13, mixed mode)
> Stream processors written in scala 2.11.8
>Reporter: Dave Thomas
>Priority: Major
> Attachments: oom-killer.txt
>
>
> We have a stream processing app with ~8 source/sink stages operating roughly 
> at the rate of 500k messages ingested/day (~4M across the 8 stages).
> We get OOM eruptions once every ~18 hours. Note it is Linux triggering the 
> OOM-killer, not the JVM terminating itself. 
> It may be worth noting that stream processing uses ~50 mbytes while 
> processing normally for hours on end, until the problem surfaces; then in 
> ~20-30 sec memory grows suddenly from under 100 mbytes to >1 gig and does not 
> shrink until the process is killed.
> We are using supervisor to restart the instances.  Sometimes, the process 
> dies immediately once stream processing resumes for the same reason, a 
> process which could continue for minutes or hours.  This extended window has 
> enabled us to capture a heap dump using jmap.
> jhat's histogram feature reveals the following top objects in memory:
> Class Instance Count  Total Size
> class [B  4070487 867857833
> class [Ljava.lang.Object; 2066986 268036184
> class [C  539519  92010932
> class [S  1003290 80263028
> class [I  508208  77821516
> class java.nio.HeapByteBuffer 1506943 58770777
> class org.apache.kafka.common.record.Record   1506783 36162792
> class org.apache.kafka.clients.consumer.ConsumerRecord528652  35948336
> class org.apache.kafka.common.record.MemoryRecords$RecordsIterator501742  
> 32613230
> class org.apache.kafka.common.record.LogEntry 2009373 32149968
> class org.xerial.snappy.SnappyInputStream 501600  20565600
> class java.io.DataInputStream 501742  20069680
> class java.io.EOFException501606  20064240
> class java.util.ArrayDeque501941  8031056
> class java.lang.Long  516463  4131704
> Note high on the list include org.apache.kafka.common.record.Record, 
> org.apache.kafka.clients.consumer.ConsumerRecord,
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator,
> org.apache.kafka.common.record.LogEntry
> All of these contain 500k-1.5M instances.
> There is nothing in stream processing logs that is distinctive (log levels 
> are still at default).
> Could it be references (weak, phantom, etc) causing these instances to not be 
> garbage-collected?
> Edit: to request a full heap dump (created using `jmap 
> -dump:format=b,file=`), contact me directly at opensou...@peoplemerge.com.  
> It is 2G.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425518608



##
File path: docs/connect.html
##
@@ -129,9 +129,11 @@ TransformationsThe file source connector reads each line as a String. We will wrap 
each line in a Map and then add a second field to identify the origin of the 
event. To do this, we use two transformations:
 
 HoistField to place the input line inside a Map
-InsertField to add the static field. In this example we'll 
indicate that the record came from a file connector
+InsertField to add the static field
 
 
+In this example we'll indicate that the record came from a file 
connector.

Review comment:
   fix the wrong format. 
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82002171-b044a580-968f-11ea-8691-bb329c62cd56.png)
   
   after:
   
![image](https://user-images.githubusercontent.com/43372967/82002116-85f2e800-968f-11ea-8d5f-4de48b5ddef1.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425522378



##
File path: docs/security.html
##
@@ -1193,7 +1193,7 @@ 7.3 
Authentication using SASL7.4 Authorization and 
ACLs
 Kafka ships with a pluggable Authorizer and an out-of-box authorizer 
implementation that uses zookeeper to store all the acls. The Authorizer is 
configured by setting authorizer.class.name in server.properties. To 
enable the out of the box implementation use:
 authorizer.class.name=kafka.security.authorizer.AclAuthorizer
-Kafka acls are defined in the general format of "Principal P is 
[Allowed/Denied] Operation O From Host H on any Resource R matching 
ResourcePattern RP". You can read more about the acl structure in KIP-11 and 
resource patterns in KIP-290. In order to add, remove or list acls you can use 
the Kafka authorizer CLI. By default, if no ResourcePatterns match a specific 
Resource R, then R has no associated acls, and therefore no one other than 
super users is allowed to access R. If you want to change that behavior, you 
can include the following in server.properties.
+Kafka acls are defined in the general format of "Principal P is 
[Allowed/Denied] Operation O From Host H on any Resource R matching 
ResourcePattern RP". You can read more about the acl structure in https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface;>KIP-11
 and resource patterns in https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs;>KIP-290.
 In order to add, remove or list acls you can use the Kafka authorizer CLI. By 
default, if no ResourcePatterns match a specific Resource R, then R has no 
associated acls, and therefore no one other than super users is allowed to 
access R. If you want to change that behavior, you can include the following in 
server.properties.

Review comment:
   In the documentation, we always put a hyperlink for the **KIP**. So, I 
added here.
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82002815-678dec00-9691-11ea-9ee7-a17f4e1b4f20.png)
   
   after:
   
![image](https://user-images.githubusercontent.com/43372967/82002859-8b513200-9691-11ea-8ca0-080c03540cc2.png)
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r420534565



##
File path: docs/security.html
##
@@ -398,7 +398,7 @@ Host Name Verification
 ssl.keystore.password=test1234
 ssl.key.password=test1234
 
-Other configuration settings that may also be needed depending on 
our requirements and the broker configuration:
+Other configuration settings that may also be needed depending on 
requirements and the broker configuration:

Review comment:
   Remove the redundant `our` here
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82003391-d61f7980-9692-11ea-9f3e-0981af1aefe8.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r420533008



##
File path: docs/ops.html
##
@@ -477,16 +477,20 @@ Limiting 
Bandwidth Usage during Da
   Throttle was removed.
 
   The administrator can also validate the assigned configs using the 
kafka-configs.sh. There are two pairs of throttle
-  configuration used to manage the throttling process. The throttle value 
itself. This is configured, at a broker
+  configuration used to manage the throttling process. This is configured, 
at a broker

Review comment:
   remove redundant sentence.
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82003320-a40e1780-9692-11ea-867d-15678605944c.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-4742) ResetTool does not commit offsets correclty

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4742.

Resolution: Abandoned

This ticket only affect 0.10.0.x and 0.10.1.x and it's very unlikely that a new 
bug-fix release will be done.

> ResetTool does not commit offsets correclty
> ---
>
> Key: KAFKA-4742
> URL: https://issues.apache.org/jira/browse/KAFKA-4742
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>Reporter: Matthias J. Sax
>Priority: Minor
>
> {{StreamsResetter}} has two issues with regard to offset manipulation
>  - it should commit offset "zero" for internal repartitioning topics (instead 
> of using `seedToBeginning()`, that is only correct for user input topics)
>  - it should not commit offsets for internal changelog topic
> Does not affect {{0.10.2}} because internal topics get deleted and as of 
> {{0.10.2}} any committed offsets get deleted automatically on topic deletion, 
> too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-4697) Simplify Streams Reset Tool

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4697.

Resolution: Abandoned

This tickets only affect 0.10.0.x and 0.10.1.x versions and it's very unlikely 
that we will do a new bug-fix release.

> Simplify Streams Reset Tool
> ---
>
> Key: KAFKA-4697
> URL: https://issues.apache.org/jira/browse/KAFKA-4697
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Currently, deleting topics does not delete committed offsets. Thus, the reset 
> tool need to modify offsets of internal repartitioning topics that are 
> actually deleted.
> With KAFKA-2000 in place, this is not required anymore (ie, this ticket does 
> not apply to {{0.10.2.0}}.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425523260



##
File path: docs/security.html
##
@@ -2158,23 +2158,23 @@ 7.6.2 Migrating cluste
 Perform a rolling restart of brokers setting the JAAS login file 
and/or defining ZooKeeper mutual TLS configurations (including connecting to 
the TLS-enabled ZooKeeper port) as required, which enables brokers to 
authenticate to ZooKeeper. At the end of the rolling restart, brokers are able 
to manipulate znodes with strict ACLs, but they will not create znodes with 
those ACLs
 If you enabled mTLS, disable the non-TLS port in ZooKeeper
 Perform a second rolling restart of brokers, this time setting the 
configuration parameter zookeeper.set.acl to true, which enables the 
use of secure ACLs when creating znodes
-Execute the ZkSecurityMigrator tool. To execute the tool, there is 
this script: ./bin/zookeeper-security-migration.sh with 
zookeeper.acl set to secure. This tool traverses the corresponding 
sub-trees changing the ACLs of the znodes. Use the --zk-tls-config-file 
file option if you enable mTLS.
+Execute the ZkSecurityMigrator tool. To execute the tool, there is 
this script: bin/zookeeper-security-migration.sh with 
zookeeper.acl set to secure. This tool traverses the corresponding 
sub-trees changing the ACLs of the znodes. Use the --zk-tls-config-file 
file option if you enable mTLS.

Review comment:
   In the documentation, when referring to the `bin/xxx.sh`, we won't add 
dot slash `./` in the beginning. Fix it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425523260



##
File path: docs/security.html
##
@@ -2158,23 +2158,23 @@ 7.6.2 Migrating cluste
 Perform a rolling restart of brokers setting the JAAS login file 
and/or defining ZooKeeper mutual TLS configurations (including connecting to 
the TLS-enabled ZooKeeper port) as required, which enables brokers to 
authenticate to ZooKeeper. At the end of the rolling restart, brokers are able 
to manipulate znodes with strict ACLs, but they will not create znodes with 
those ACLs
 If you enabled mTLS, disable the non-TLS port in ZooKeeper
 Perform a second rolling restart of brokers, this time setting the 
configuration parameter zookeeper.set.acl to true, which enables the 
use of secure ACLs when creating znodes
-Execute the ZkSecurityMigrator tool. To execute the tool, there is 
this script: ./bin/zookeeper-security-migration.sh with 
zookeeper.acl set to secure. This tool traverses the corresponding 
sub-trees changing the ACLs of the znodes. Use the --zk-tls-config-file 
file option if you enable mTLS.
+Execute the ZkSecurityMigrator tool. To execute the tool, there is 
this script: bin/zookeeper-security-migration.sh with 
zookeeper.acl set to secure. This tool traverses the corresponding 
sub-trees changing the ACLs of the znodes. Use the --zk-tls-config-file 
file option if you enable mTLS.

Review comment:
   In the documentation, when referring to the `bin/xxx.sh`, we won't add 
dot slash `./` in the beginning. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425522820



##
File path: docs/security.html
##
@@ -1438,7 +1438,7 @@ Examples
 bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:Bob --allow-principal User:Alice --allow-host 
198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write 
--topic Test-topic
 By default, all principals that don't have an explicit acl that 
allows access for an operation to a resource are denied. In rare cases where an 
allow acl is defined that allows access to all but some principal we will have 
to use the --deny-principal and --deny-host option. For example, if we want to 
allow all users to Read from Test-topic but only deny User:BadBob from IP 
198.51.100.3 we can do so using following commands:
 bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:* --allow-host * --deny-principal User:BadBob 
--deny-host 198.51.100.3 --operation Read --topic Test-topic
-Note that ``--allow-host`` and ``deny-host`` only support IP 
addresses (hostnames are not supported).
+Note that --allow-host and --deny-host 
only support IP addresses (hostnames are not supported).

Review comment:
   There's no format like ` `` ` in the documentation anywhere else. 
Replace with `` formatting here.
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82002937-b9367680-9691-11ea-9588-be3628f9c340.png)
   
   after:
   
![image](https://user-images.githubusercontent.com/43372967/82002970-ce130a00-9691-11ea-8ffc-8ed41b3a55a4.png)
   

##
File path: docs/security.html
##
@@ -1451,7 +1451,7 @@ Examples
 Removing Acls
 Removing acls is pretty much the same. The only difference is 
instead of --add option users will have to specify --remove option. To remove 
the acls added by the first example above we can execute the CLI with following 
options:
  bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --remove 
--allow-principal User:Bob --allow-principal User:Alice --allow-host 
198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write 
--topic Test-topic 
-If you wan to remove the acl added to the prefixed resource 
pattern above we can execute the CLI with following options:
+If you want to remove the acl added to the prefixed resource 
pattern above we can execute the CLI with following options:

Review comment:
   fix typo.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-4327) Move Reset Tool from core to streams

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4327:
---
Fix Version/s: 3.0.0

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425522820



##
File path: docs/security.html
##
@@ -1438,7 +1438,7 @@ Examples
 bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:Bob --allow-principal User:Alice --allow-host 
198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write 
--topic Test-topic
 By default, all principals that don't have an explicit acl that 
allows access for an operation to a resource are denied. In rare cases where an 
allow acl is defined that allows access to all but some principal we will have 
to use the --deny-principal and --deny-host option. For example, if we want to 
allow all users to Read from Test-topic but only deny User:BadBob from IP 
198.51.100.3 we can do so using following commands:
 bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:* --allow-host * --deny-principal User:BadBob 
--deny-host 198.51.100.3 --operation Read --topic Test-topic
-Note that ``--allow-host`` and ``deny-host`` only support IP 
addresses (hostnames are not supported).
+Note that --allow-host and --deny-host 
only support IP addresses (hostnames are not supported).

Review comment:
   There's no format like ` `` ` in the documentation anywhere else. 
Replace with `` formatting here.
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82002937-b9367680-9691-11ea-9588-be3628f9c340.png)
   after:
   
![image](https://user-images.githubusercontent.com/43372967/82002970-ce130a00-9691-11ea-8ffc-8ed41b3a55a4.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-4327) Move Reset Tool from core to streams

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4327:
---
Priority: Blocker  (was: Minor)

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Blocker
>  Labels: needs-kip
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425522820



##
File path: docs/security.html
##
@@ -1438,7 +1438,7 @@ Examples
 bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:Bob --allow-principal User:Alice --allow-host 
198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write 
--topic Test-topic
 By default, all principals that don't have an explicit acl that 
allows access for an operation to a resource are denied. In rare cases where an 
allow acl is defined that allows access to all but some principal we will have 
to use the --deny-principal and --deny-host option. For example, if we want to 
allow all users to Read from Test-topic but only deny User:BadBob from IP 
198.51.100.3 we can do so using following commands:
 bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:* --allow-host * --deny-principal User:BadBob 
--deny-host 198.51.100.3 --operation Read --topic Test-topic
-Note that ``--allow-host`` and ``deny-host`` only support IP 
addresses (hostnames are not supported).
+Note that --allow-host and --deny-host 
only support IP addresses (hostnames are not supported).

Review comment:
   There's no format like ` `` ` in the documentation anywhere else. 
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82002937-b9367680-9691-11ea-9588-be3628f9c340.png)
   after:
   
![image](https://user-images.githubusercontent.com/43372967/82002970-ce130a00-9691-11ea-8ffc-8ed41b3a55a4.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-7566) Add sidecar job to leader (or a random single follower) only

2020-05-14 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-7566.

Resolution: Won't Fix

> Add sidecar job to leader (or a random single follower) only
> 
>
> Key: KAFKA-7566
> URL: https://issues.apache.org/jira/browse/KAFKA-7566
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Minor
>
> Hey there,
> recently we need to add an archive job to a streaming application. The caveat 
> is that we need to make sure only one instance is doing this task to avoid 
> potential race condition, and we also don't want to schedule it as a regular 
> stream task so that we will be blocking normal streaming operation. 
> Although we could do so by doing a zk lease, I'm raising the case here since 
> this could be some potential use case for streaming job also. For example, 
> there are some `leader specific` operation we could schedule in DSL instead 
> of adhoc manner.
> Let me know if you think this makes sense to you, thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425522378



##
File path: docs/security.html
##
@@ -1193,7 +1193,7 @@ 7.3 
Authentication using SASL7.4 Authorization and 
ACLs
 Kafka ships with a pluggable Authorizer and an out-of-box authorizer 
implementation that uses zookeeper to store all the acls. The Authorizer is 
configured by setting authorizer.class.name in server.properties. To 
enable the out of the box implementation use:
 authorizer.class.name=kafka.security.authorizer.AclAuthorizer
-Kafka acls are defined in the general format of "Principal P is 
[Allowed/Denied] Operation O From Host H on any Resource R matching 
ResourcePattern RP". You can read more about the acl structure in KIP-11 and 
resource patterns in KIP-290. In order to add, remove or list acls you can use 
the Kafka authorizer CLI. By default, if no ResourcePatterns match a specific 
Resource R, then R has no associated acls, and therefore no one other than 
super users is allowed to access R. If you want to change that behavior, you 
can include the following in server.properties.
+Kafka acls are defined in the general format of "Principal P is 
[Allowed/Denied] Operation O From Host H on any Resource R matching 
ResourcePattern RP". You can read more about the acl structure in https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface;>KIP-11
 and resource patterns in https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs;>KIP-290.
 In order to add, remove or list acls you can use the Kafka authorizer CLI. By 
default, if no ResourcePatterns match a specific Resource R, then R has no 
associated acls, and therefore no one other than super users is allowed to 
access R. If you want to change that behavior, you can include the following in 
server.properties.

Review comment:
   In the documentation, we always put a hyperlink for the **KIP**. So, I 
added here.
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82002815-678dec00-9691-11ea-9ee7-a17f4e1b4f20.png)
   after:
   
![image](https://user-images.githubusercontent.com/43372967/82002859-8b513200-9691-11ea-8ca0-080c03540cc2.png)
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-3779) Add the LRU cache for KTable.to() operator

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-3779.

Resolution: Won't Fix

`KTable#to()` was removed from the API.

> Add the LRU cache for KTable.to() operator
> --
>
> Key: KAFKA-3779
> URL: https://issues.apache.org/jira/browse/KAFKA-3779
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
>Priority: Major
>
> The KTable.to operator currently does not use a cache. We can add a cache to 
> this operator to deduplicate and reduce data traffic as well. This is to be 
> done after KAFKA-3777.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425521117



##
File path: docs/security.html
##
@@ -767,7 +767,7 @@ 7.3 
Authentication using SASL
 > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file 
zk_tls_config.properties --describe --entity-type users --entity-name alice
 
-Credentials may be deleted for one or more SCRAM mechanisms using 
the --delete option:
+Credentials may be deleted for one or more SCRAM mechanisms using 
the --alter --delete-config option:
 
 > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file 
zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type 
users --entity-name alice

Review comment:
   There's no `--delete` option. Because in the previous one sample, we 
explained the `--describe` option usage, we should also put `--alter` here 
explicitly for user, not just `--delete-config`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425521117



##
File path: docs/security.html
##
@@ -767,7 +767,7 @@ 7.3 
Authentication using SASL
 > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file 
zk_tls_config.properties --describe --entity-type users --entity-name alice
 
-Credentials may be deleted for one or more SCRAM mechanisms using 
the --delete option:
+Credentials may be deleted for one or more SCRAM mechanisms using 
the --alter --delete-config option:
 
 > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file 
zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type 
users --entity-name alice

Review comment:
   There's no `--delete` option. Because the previous one sample, we 
explained the `--describe` option usage, we should also put `--alter` here 
explicitly for user.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425521117



##
File path: docs/security.html
##
@@ -767,7 +767,7 @@ 7.3 
Authentication using SASL
 > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file 
zk_tls_config.properties --describe --entity-type users --entity-name alice
 
-Credentials may be deleted for one or more SCRAM mechanisms using 
the --delete option:
+Credentials may be deleted for one or more SCRAM mechanisms using 
the --alter --delete-config option:
 
 > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file 
zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type 
users --entity-name alice

Review comment:
   There's no `--delete` option. Because the previous one sample, we 
explained the `--describe` option usage, we should also put `--alter` here 
explicitly for user, not just `--delete-config`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10) Kafka deployment on EC2 should be WHIRR based, instead of current contrib/deploy code based solution

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10.
--
Resolution: Won't Fix  (was: Abandoned)

Apache Whirr has been retired.

> Kafka deployment on EC2 should be WHIRR based, instead of current 
> contrib/deploy code based solution
> 
>
> Key: KAFKA-10
> URL: https://issues.apache.org/jira/browse/KAFKA-10
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Affects Versions: 0.6
>Priority: Major
>
> Apache Whirr is a a set of libraries for running cloud services 
> http://incubator.apache.org/whirr/ 
> It is desirable that Kafka's integration with EC2 be Whirr based, rather than 
> the code based solution we currently have in contrib/deploy. 
> The code in contrib/deploy will be deleted in 0.6 release



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425521117



##
File path: docs/security.html
##
@@ -767,7 +767,7 @@ 7.3 
Authentication using SASL
 > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file 
zk_tls_config.properties --describe --entity-type users --entity-name alice
 
-Credentials may be deleted for one or more SCRAM mechanisms using 
the --delete option:
+Credentials may be deleted for one or more SCRAM mechanisms using 
the --alter --delete-config option:
 
 > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file 
zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type 
users --entity-name alice

Review comment:
   There's no `--delete` option. Updated to `--alter --delete-config` 
because the previous one, we explained the `--describe` option usage.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425521117



##
File path: docs/security.html
##
@@ -767,7 +767,7 @@ 7.3 
Authentication using SASL
 > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file 
zk_tls_config.properties --describe --entity-type users --entity-name alice
 
-Credentials may be deleted for one or more SCRAM mechanisms using 
the --delete option:
+Credentials may be deleted for one or more SCRAM mechanisms using 
the --alter --delete-config option:
 
 > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file 
zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type 
users --entity-name alice

Review comment:
   There's no `--delete` option. Updated to `--alter --delete-config`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425520647



##
File path: docs/connect.html
##
@@ -309,7 +311,7 @@ Connector
 }
 
 
-We will define the FileStreamSourceTask class below. Next, 
we add some standard lifecycle methods, start() and 
stop():
+We will define the FileStreamSourceTask class below. Next, 
we add some standard lifecycle methods, start() and 
stop():

Review comment:
   fix wrong html format for the colon `:`.
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82002522-8d66c100-9690-11ea-8df3-8871569d188c.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425519960



##
File path: docs/connect.html
##
@@ -174,7 +176,7 @@ TransformationsInsertField - Add a field using either static data or record 
metadata
 ReplaceField - Filter or rename fields
 MaskField - Replace field with valid null value for the type (0, 
empty string, etc)
-ValueToKey
+ValueToKey - Replace the record key with a new key formed from a 
subset of fields in the record value

Review comment:
   Add a one line description for `ValueToKey` transformation to be 
consistent with others. The description is copied from the detailed explanation 
below.
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82002437-5a243200-9690-11ea-8c8b-2862c5aecd69.png)
   
   description source:
   
![image](https://user-images.githubusercontent.com/43372967/82002385-38c34600-9690-11ea-8733-19ed835a5a3f.png)
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425519960



##
File path: docs/connect.html
##
@@ -174,7 +176,7 @@ TransformationsInsertField - Add a field using either static data or record 
metadata
 ReplaceField - Filter or rename fields
 MaskField - Replace field with valid null value for the type (0, 
empty string, etc)
-ValueToKey
+ValueToKey - Replace the record key with a new key formed from a 
subset of fields in the record value

Review comment:
   Add a one line description for `ValueToKey` transformation to be 
consistent with others. The description is copied from the detailed explanation 
below.
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82002297-04e82080-9690-11ea-9e1b-6cb218b0593c.png)
   
   description source:
   
![image](https://user-images.githubusercontent.com/43372967/82002385-38c34600-9690-11ea-8733-19ed835a5a3f.png)
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425518608



##
File path: docs/connect.html
##
@@ -129,9 +129,11 @@ TransformationsThe file source connector reads each line as a String. We will wrap 
each line in a Map and then add a second field to identify the origin of the 
event. To do this, we use two transformations:
 
 HoistField to place the input line inside a Map
-InsertField to add the static field. In this example we'll 
indicate that the record came from a file connector
+InsertField to add the static field
 
 
+In this example we'll indicate that the record came from a file 
connector.

Review comment:
   fix the wrong format. 
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82002171-b044a580-968f-11ea-8691-bb329c62cd56.png)
   after:
   
![image](https://user-images.githubusercontent.com/43372967/82002116-85f2e800-968f-11ea-8d5f-4de48b5ddef1.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425518608



##
File path: docs/connect.html
##
@@ -129,9 +129,11 @@ TransformationsThe file source connector reads each line as a String. We will wrap 
each line in a Map and then add a second field to identify the origin of the 
event. To do this, we use two transformations:
 
 HoistField to place the input line inside a Map
-InsertField to add the static field. In this example we'll 
indicate that the record came from a file connector
+InsertField to add the static field
 
 
+In this example we'll indicate that the record came from a file 
connector.

Review comment:
   fix the wrong format. 
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82002047-504dff00-968f-11ea-9332-9a28bd3fc1bb.png)
   after:
   
![image](https://user-images.githubusercontent.com/43372967/82002116-85f2e800-968f-11ea-8d5f-4de48b5ddef1.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425517884



##
File path: docs/connect.html
##
@@ -103,7 +103,7 @@ Configuring Connecto
 topics.regex - A Java regular expression of topics to 
use as input for this connector
 
 
-For any other options, you should consult the documentation for the 
connector.
+For any other options, you should consult the documentation for the connector.

Review comment:
   Add link to the connector config section.
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82001955-0bc26380-968f-11ea-8271-08372e59c5b5.png)
   after:
   
![image](https://user-images.githubusercontent.com/43372967/82002000-2f85a980-968f-11ea-9f2e-73b3ea01b9b0.png)
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425517884



##
File path: docs/connect.html
##
@@ -103,7 +103,7 @@ Configuring Connecto
 topics.regex - A Java regular expression of topics to 
use as input for this connector
 
 
-For any other options, you should consult the documentation for the 
connector.
+For any other options, you should consult the documentation for the connector.

Review comment:
   Add link to the connector config section.
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82001955-0bc26380-968f-11ea-8271-08372e59c5b5.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425517411



##
File path: docs/connect.html
##
@@ -32,7 +32,7 @@ 8.1 
Overview
 
 8.2 User Guide
 
-The quickstart provides a brief example of how to run a standalone 
version of Kafka Connect. This section describes how to configure, run, and 
manage Kafka Connect in more detail.
+The quickstart provides a brief example of 
how to run a standalone version of Kafka Connect. This section describes how to 
configure, run, and manage Kafka Connect in more detail.

Review comment:
   The first time reader may not find where the **quickstart** is for the 
example.
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82001847-bbe39c80-968e-11ea-9de9-0bb5bd870cd6.png)
   after:
   
![image](https://user-images.githubusercontent.com/43372967/82001879-d3bb2080-968e-11ea-9bd3-6d072fe9b3e8.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8670: KAFKA-10001: Should trigger store specific callback if it is also a listener

2020-05-14 Thread GitBox


mjsax commented on a change in pull request #8670:
URL: https://github.com/apache/kafka/pull/8670#discussion_r425514013



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -522,6 +523,12 @@ private void restoreChangelog(final ChangelogMetadata 
changelogMetadata) {
 // do not trigger restore listener if we are processing standby 
tasks
 if (changelogMetadata.stateManager.taskType() == 
Task.TaskType.ACTIVE) {
 try {
+// first trigger the store's specific listener if its 
registered callback is also an lister,
+// then trigger the user registered global listener
+final StateRestoreCallback restoreCallback = 
changelogMetadata.storeMetadata.restoreCallback();
+if (restoreCallback instanceof StateRestoreListener)

Review comment:
   Nit: opening/closing `{ }` are missing

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -522,6 +523,12 @@ private void restoreChangelog(final ChangelogMetadata 
changelogMetadata) {
 // do not trigger restore listener if we are processing standby 
tasks
 if (changelogMetadata.stateManager.taskType() == 
Task.TaskType.ACTIVE) {
 try {
+// first trigger the store's specific listener if its 
registered callback is also an lister,
+// then trigger the user registered global listener
+final StateRestoreCallback restoreCallback = 
changelogMetadata.storeMetadata.restoreCallback();

Review comment:
   Nit: we get `storeMetadata` above already (similar below) also the `if` 
is redundant  -- should we unify it?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.

2020-05-14 Thread victor (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107816#comment-17107816
 ] 

victor commented on KAFKA-9981:
---

A new topic/partition  is created, but the data is not synchronized.

> Running a dedicated mm2 cluster with more than one nodes,When the 
> configuration is updated the task is not aware and will lose the update 
> operation.
> 
>
> Key: KAFKA-9981
> URL: https://issues.apache.org/jira/browse/KAFKA-9981
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: victor
>Priority: Major
>
> DistributedHerder.reconfigureConnector induction config update as follows:
> {code:java}
> if (changed) {
> List> rawTaskProps = reverseTransform(connName, 
> configState, taskProps);
> if (isLeader()) {
> configBackingStore.putTaskConfigs(connName, rawTaskProps);
> cb.onCompletion(null, null);
> } else {
> // We cannot forward the request on the same thread because this 
> reconfiguration can happen as a result of connector
> // addition or removal. If we blocked waiting for the response from 
> leader, we may be kicked out of the worker group.
> forwardRequestExecutor.submit(new Runnable() {
> @Override
> public void run() {
> try {
> String leaderUrl = leaderUrl();
> if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
> cb.onCompletion(new ConnectException("Request to 
> leader to " +
> "reconfigure connector tasks failed " +
> "because the URL of the leader's REST 
> interface is empty!"), null);
> return;
> }
> String reconfigUrl = RestServer.urlJoin(leaderUrl, 
> "/connectors/" + connName + "/tasks");
> log.trace("Forwarding task configurations for connector 
> {} to leader", connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, 
> rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
> cb.onCompletion(null, null);
> } catch (ConnectException e) {
> log.error("Request to leader to reconfigure connector 
> tasks failed", e);
> cb.onCompletion(e, null);
> }
> }
> });
> }
> }
> {code}
> KafkaConfigBackingStore task checks for configuration updates,such as topic 
> whitelist update.If KafkaConfigBackingStore task is not running on leader 
> node,an HTTP request will be send to notify the leader of the configuration 
> update.However,dedicated mm2 cluster does not have the HTTP server turned 
> on,so the request will fail to be sent,causing the update operation to be 
> lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10001) Store's own restore listener should be triggered in store changelog reader

2020-05-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10001:
--
Description: 
Streams' state store ``register()`` function passed in `restoreCallback` can 
potentially also be a `RestoreListener`, in which case its corresponding 
`onRestoreStart / End / batchRestored` should be triggered.

This is a regression in trunk -- 2.5 has this logic right but got regressed.

> Store's own restore listener should be triggered in store changelog reader
> --
>
> Key: KAFKA-10001
> URL: https://issues.apache.org/jira/browse/KAFKA-10001
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Major
>
> Streams' state store ``register()`` function passed in `restoreCallback` can 
> potentially also be a `RestoreListener`, in which case its corresponding 
> `onRestoreStart / End / batchRestored` should be triggered.
> This is a regression in trunk -- 2.5 has this logic right but got regressed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10001) Store's own restore listener should be triggered in store changelog reader

2020-05-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-10001:
-

Assignee: Guozhang Wang

> Store's own restore listener should be triggered in store changelog reader
> --
>
> Key: KAFKA-10001
> URL: https://issues.apache.org/jira/browse/KAFKA-10001
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> Streams' state store ``register()`` function passed in `restoreCallback` can 
> potentially also be a `RestoreListener`, in which case its corresponding 
> `onRestoreStart / End / batchRestored` should be triggered.
> This is a regression in trunk -- 2.5 has this logic right but got regressed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10001) Store's own restore listener should be triggered in store changelog reader

2020-05-14 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-10001:
-

 Summary: Store's own restore listener should be triggered in store 
changelog reader
 Key: KAFKA-10001
 URL: https://issues.apache.org/jira/browse/KAFKA-10001
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang opened a new pull request #8670: MINOR: Should trigger store specific callback if it is also a listener

2020-05-14 Thread GitBox


guozhangwang opened a new pull request #8670:
URL: https://github.com/apache/kafka/pull/8670


   The store's registered callback could also be a restore listener, in which 
case it should be triggered along with the user specified global listener as 
well.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >