[jira] [Commented] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition

2019-10-31 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964576#comment-16964576
 ] 

Guozhang Wang commented on KAFKA-9073:
--

[~simplyamuthan] You can find the explanation in the above PR, it is actually a 
pretty straight-forward bug.

> Kafka Streams State stuck in rebalancing after one of the StreamThread 
> encounters java.lang.IllegalStateException: No current assignment for 
> partition
> --
>
> Key: KAFKA-9073
> URL: https://issues.apache.org/jira/browse/KAFKA-9073
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: amuthan Ganeshan
>Priority: Major
> Fix For: 2.4.0
>
> Attachments: KAFKA-9073.log
>
>
> I have a Kafka stream application that stores the incoming messages into a 
> state store, and later during the punctuation period, we store them into a 
> big data persistent store after processing the messages.
> The application consumes from 120 partitions distributed across 40 instances. 
> The application has been running fine without any problem for months, but all 
> of a sudden some of the instances failed because of a stream thread exception 
> saying  
> ```java.lang.IllegalStateException: No current assignment for partition 
> --changelog-98```
>  
> And other instances are stuck in the REBALANCING state, and never comes out 
> of it. Here is the full stack trace, I just masked the application-specific 
> app name and store name in the stack trace due to NDA.
>  
> ```
> 2019-10-21 13:27:13,481 ERROR 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> Encountered the following error during processing:
> java.lang.IllegalStateException: No current assignment for partition 
> application.id-store_name-changelog-98
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> ```
>  
> Now I checked the state sore disk usage; it is less than 40% of the total 
> disk space available. Restarting the application solves the problem for a 
> short amount of time, but the error popping up randomly on some other 
> instances quickly. I tried to change the retry and retry.backoff.ms 
> configuration but not helpful at all
> ```
> retries = 2147483647
> retry.backoff.ms
> ```
> After googling for some time I found there was a similar bug reported to the 
> Kafka team in the past, and also notice my stack trace is exactly matching 
> with the stack trace of the reported bug.
> Here is the link for the 

[jira] [Resolved] (KAFKA-9114) Kafka broker fails to establish secure zookeeper connection via SSL.

2019-10-31 Thread Gangadhar Balikai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gangadhar Balikai resolved KAFKA-9114.
--
Resolution: Invalid

> Kafka broker fails to establish secure zookeeper connection via SSL.
> 
>
> Key: KAFKA-9114
> URL: https://issues.apache.org/jira/browse/KAFKA-9114
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Gangadhar Balikai
>Priority: Minor
>
> When i try to enable TLS/SSL between Kafka broker (tried 2.3.0 && 2.3.1) and 
> zookeeper (3.5.5 & 3.5.6) cluster of 3 nodes. 
> kafka broker fails with following stack trace, i have given stacktrace, kafka 
> & zookeeper configurations used below.
> *JDK*: 1_8_0_161_64
> {color:#de350b}[2019-10-30 03:52:10,036] ERROR Fatal error during KafkaServer 
> startup. Prepare to shutdown (kafka.server.KafkaServer){color}
> {color:#de350b}java.io.IOException: Couldn't instantiate 
> org.apache.zookeeper.ClientCnxnSocketNetty{color}
> {color:#de350b} at 
> org.apache.zookeeper.ZooKeeper.getClientCnxnSocket(ZooKeeper.java:1851){color}
> {color:#de350b} at 
> org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:453){color}
> {color:#de350b} at 
> org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:384){color}
> {color:#de350b} at 
> kafka.zookeeper.ZooKeeperClient.(ZooKeeperClient.scala:103){color}
> {color:#de350b} at 
> kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826){color}
> {color:#de350b} at 
> kafka.server.KafkaServer.createZkClient$1(KafkaServer.scala:364){color}
> {color:#de350b} at 
> kafka.server.KafkaServer.initZkClient(KafkaServer.scala:387){color}
> {color:#de350b} at 
> kafka.server.KafkaServer.startup(KafkaServer.scala:207){color}
> {color:#de350b} at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38){color}
> {color:#de350b} at kafka.Kafka$.main(Kafka.scala:84){color}
> {color:#de350b} at kafka.Kafka.main(Kafka.scala){color}
> {color:#de350b}Caused by: java.lang.NoSuchMethodException: 
> org.apache.zookeeper.ClientCnxnSocketNetty.(){color}
> {color:#de350b} at java.lang.Class.getConstructor0(Class.java:3082){color}
> {color:#de350b} at 
> java.lang.Class.getDeclaredConstructor(Class.java:2178){color}
> {color:#de350b} at 
> org.apache.zookeeper.ZooKeeper.getClientCnxnSocket(ZooKeeper.java:1848){color}
> {color:#de350b} ... 10 more{color}
> {color:#de350b}[2019-10-30 03:52:10,039] INFO shutting down 
> (kafka.server.KafkaServer){color}
> {color:#de350b}[2019-10-30 03:52:10,046] INFO shut down completed 
> (kafka.server.KafkaServer){color}
> {color:#de350b}[2019-10-30 03:52:10,046] ERROR Exiting Kafka. 
> (kafka.server.KafkaServerStartable){color}
> {color:#de350b}[2019-10-30 03:52:10,048] INFO shutting down 
> (kafka.server.KafkaServer){color}
> STEPS.
> 1)  I copied following zookeeper dependencies into kafka bin. 
> a) kafka 2.3.0 and zookeer 3.5.5
> "zookeeper-3.5.6.jar" "zookeeper-jute-3.5.6.jar" "netty*.jar" 
> "commons-cli-1.2.jar"
> b) kafka 2.3.1 and zookeer 3.5.6
> "zookeeper-3.5.6.jar" "zookeeper-jute-3.5.6.jar" 
> "netty-buffer-4.1.42.Final.jar" "netty-buffer-4.1.42.Final.LICENSE.txt" 
> "netty-codec-4.1.42.Final.jar" "netty-codec-4.1.42.Final.LICENSE.txt" 
> "netty-common-4.1.42.Final.jar" "netty-common-4.1.42.Final.LICENSE.txt" 
> "netty-handler-4.1.42.Final.jar" "netty-handler-4.1.42.Final.LICENSE.txt" 
> "netty-resolver-4.1.42.Final.jar" "netty-resolver-4.1.42.Final.LICENSE.txt" 
> "netty-transport-4.1.42.Final.jar" "netty-transport-4.1.42.Final.LICENSE.txt" 
> "netty-transport-native-epoll-4.1.42.Final.jar" 
> "netty-transport-native-epoll-4.1.42.Final.LICENSE.txt" 
> "netty-transport-native-unix-common-4.1.42.Final.jar" 
> "netty-transport-native-unix-common-4.1.42.Final.LICENSE.txt" 
> "commons-cli-1.2.jar"
> *2) Configurations:* 
> The *zookeeper* cluster looks good with
> 1) configuration *zoo.conf*. 
> {color:#505f79}quorum.auth.server.loginContext=QuorumServer{color}
> {color:#505f79}quorum.auth.learner.loginContext=QuorumLearner{color}
> {color:#505f79}syncLimit=2{color}
> {color:#505f79}tickTime=2000{color}
> {color:#505f79}server.3=broker1\:2888\:3888{color}
> {color:#505f79}server.2=broker2\:2888\:3888{color}
> {color:#505f79}server.1=broker3\:2888\:3888{color}
> {color:#505f79}authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider{color}
> {color:#505f79}initLimit=10{color}
> {color:#505f79}secureClientPort=2281{color}
> {color:#505f79}quorum.auth.learnerRequireSasl=true{color}
> {color:#505f79}quorum.auth.enableSasl=true{color}
> {color:#505f79}quorum.auth.kerberos.servicePrincipal=servicename/_HOST{color}
> {color:#505f79}quorum.cnxn.threads.size=20{color}
> {color:#505f79}zookeeper.client.secure=true{color}
> {color:#505f79}quorum.auth.serverRequireSasl=true{color}
> 

[jira] [Commented] (KAFKA-9114) Kafka broker fails to establish secure zookeeper connection via SSL.

2019-10-31 Thread Gangadhar Balikai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964570#comment-16964570
 ] 

Gangadhar Balikai commented on KAFKA-9114:
--

found issue on my side.

> Kafka broker fails to establish secure zookeeper connection via SSL.
> 
>
> Key: KAFKA-9114
> URL: https://issues.apache.org/jira/browse/KAFKA-9114
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Gangadhar Balikai
>Priority: Minor
>
> When i try to enable TLS/SSL between Kafka broker (tried 2.3.0 && 2.3.1) and 
> zookeeper (3.5.5 & 3.5.6) cluster of 3 nodes. 
> kafka broker fails with following stack trace, i have given stacktrace, kafka 
> & zookeeper configurations used below.
> *JDK*: 1_8_0_161_64
> {color:#de350b}[2019-10-30 03:52:10,036] ERROR Fatal error during KafkaServer 
> startup. Prepare to shutdown (kafka.server.KafkaServer){color}
> {color:#de350b}java.io.IOException: Couldn't instantiate 
> org.apache.zookeeper.ClientCnxnSocketNetty{color}
> {color:#de350b} at 
> org.apache.zookeeper.ZooKeeper.getClientCnxnSocket(ZooKeeper.java:1851){color}
> {color:#de350b} at 
> org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:453){color}
> {color:#de350b} at 
> org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:384){color}
> {color:#de350b} at 
> kafka.zookeeper.ZooKeeperClient.(ZooKeeperClient.scala:103){color}
> {color:#de350b} at 
> kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826){color}
> {color:#de350b} at 
> kafka.server.KafkaServer.createZkClient$1(KafkaServer.scala:364){color}
> {color:#de350b} at 
> kafka.server.KafkaServer.initZkClient(KafkaServer.scala:387){color}
> {color:#de350b} at 
> kafka.server.KafkaServer.startup(KafkaServer.scala:207){color}
> {color:#de350b} at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38){color}
> {color:#de350b} at kafka.Kafka$.main(Kafka.scala:84){color}
> {color:#de350b} at kafka.Kafka.main(Kafka.scala){color}
> {color:#de350b}Caused by: java.lang.NoSuchMethodException: 
> org.apache.zookeeper.ClientCnxnSocketNetty.(){color}
> {color:#de350b} at java.lang.Class.getConstructor0(Class.java:3082){color}
> {color:#de350b} at 
> java.lang.Class.getDeclaredConstructor(Class.java:2178){color}
> {color:#de350b} at 
> org.apache.zookeeper.ZooKeeper.getClientCnxnSocket(ZooKeeper.java:1848){color}
> {color:#de350b} ... 10 more{color}
> {color:#de350b}[2019-10-30 03:52:10,039] INFO shutting down 
> (kafka.server.KafkaServer){color}
> {color:#de350b}[2019-10-30 03:52:10,046] INFO shut down completed 
> (kafka.server.KafkaServer){color}
> {color:#de350b}[2019-10-30 03:52:10,046] ERROR Exiting Kafka. 
> (kafka.server.KafkaServerStartable){color}
> {color:#de350b}[2019-10-30 03:52:10,048] INFO shutting down 
> (kafka.server.KafkaServer){color}
> STEPS.
> 1)  I copied following zookeeper dependencies into kafka bin. 
> a) kafka 2.3.0 and zookeer 3.5.5
> "zookeeper-3.5.6.jar" "zookeeper-jute-3.5.6.jar" "netty*.jar" 
> "commons-cli-1.2.jar"
> b) kafka 2.3.1 and zookeer 3.5.6
> "zookeeper-3.5.6.jar" "zookeeper-jute-3.5.6.jar" 
> "netty-buffer-4.1.42.Final.jar" "netty-buffer-4.1.42.Final.LICENSE.txt" 
> "netty-codec-4.1.42.Final.jar" "netty-codec-4.1.42.Final.LICENSE.txt" 
> "netty-common-4.1.42.Final.jar" "netty-common-4.1.42.Final.LICENSE.txt" 
> "netty-handler-4.1.42.Final.jar" "netty-handler-4.1.42.Final.LICENSE.txt" 
> "netty-resolver-4.1.42.Final.jar" "netty-resolver-4.1.42.Final.LICENSE.txt" 
> "netty-transport-4.1.42.Final.jar" "netty-transport-4.1.42.Final.LICENSE.txt" 
> "netty-transport-native-epoll-4.1.42.Final.jar" 
> "netty-transport-native-epoll-4.1.42.Final.LICENSE.txt" 
> "netty-transport-native-unix-common-4.1.42.Final.jar" 
> "netty-transport-native-unix-common-4.1.42.Final.LICENSE.txt" 
> "commons-cli-1.2.jar"
> *2) Configurations:* 
> The *zookeeper* cluster looks good with
> 1) configuration *zoo.conf*. 
> {color:#505f79}quorum.auth.server.loginContext=QuorumServer{color}
> {color:#505f79}quorum.auth.learner.loginContext=QuorumLearner{color}
> {color:#505f79}syncLimit=2{color}
> {color:#505f79}tickTime=2000{color}
> {color:#505f79}server.3=broker1\:2888\:3888{color}
> {color:#505f79}server.2=broker2\:2888\:3888{color}
> {color:#505f79}server.1=broker3\:2888\:3888{color}
> {color:#505f79}authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider{color}
> {color:#505f79}initLimit=10{color}
> {color:#505f79}secureClientPort=2281{color}
> {color:#505f79}quorum.auth.learnerRequireSasl=true{color}
> {color:#505f79}quorum.auth.enableSasl=true{color}
> {color:#505f79}quorum.auth.kerberos.servicePrincipal=servicename/_HOST{color}
> {color:#505f79}quorum.cnxn.threads.size=20{color}
> {color:#505f79}zookeeper.client.secure=true{color}
> 

[jira] [Created] (KAFKA-9127) Needless group coordination overhead for GlobalKTables

2019-10-31 Thread Chris Toomey (Jira)
Chris Toomey created KAFKA-9127:
---

 Summary: Needless group coordination overhead for GlobalKTables
 Key: KAFKA-9127
 URL: https://issues.apache.org/jira/browse/KAFKA-9127
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.3.0
Reporter: Chris Toomey


When creating a simple stream topology to just populate a GlobalKTable, I 
noticed from logging that the stream consumer was doing group coordination 
requests (JoinGroup, SyncGroup, ...) to the server, which it had no reason to 
do since the global consumer thread populating the table fetches from all 
partitions and thus doesn't use the group requests. So this adds needless 
overhead on the client, network, and server.

I tracked this down to the stream thread consumer, which is created regardless 
of whether it's needed based solely on NUM_STREAM_THREADS_CONFIG which defaults 
to 1 I guess.

I found that setting NUM_STREAM_THREADS_CONFIG to 0 will prevent this from 
happening, but it'd be a worthwhile improvement to be able to override this 
setting in cases of topologies like this that don't have any need for stream 
threads. Hence this ticket.

I originally asked about this on the users mailing list where Bruno suggested I 
file it as an improvement request.

Here's the Scala code that I'm using that exhibits this:
{code:scala}
val builder: StreamsBuilder = new StreamsBuilder()
val gTable = builder.globalTable[K, V](...)
val stream = new KafkaStreams(builder.build(), props)
stream.start(){code}
 Not shown is the state store that I'm populating/using.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9093) NullPointerException in KafkaConsumer with group.instance.id

2019-10-31 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-9093.
---
Resolution: Fixed

> NullPointerException in KafkaConsumer with group.instance.id
> 
>
> Key: KAFKA-9093
> URL: https://issues.apache.org/jira/browse/KAFKA-9093
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
>Reporter: Rolef Heinrich
>Assignee: huxihx
>Priority: Minor
> Fix For: 2.3.2
>
>
> When using *group.instance.id=myUniqId[0]*, the KafkaConsumer's constructor 
> throws a NullpointerException in close():
>  
> {code:java}
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644)
> {code}
> {{It turns out that the exception is thrown because the *log* member is not 
> yet initialized (still null) in the constructor when the original exception 
> is handled. The original exception is thrown before *log* is initailized.}}
> {{The side effect of this error is, that close does does not cleanup 
> resources as clean is supposed to do.}}
> *{{The used consumer properties for reference:}}*
>  
> {code:java}
> key.deserializer=com.ibm.streamsx.kafka.serialization
> request.timeout.ms=25000
> value.deserializer=com.ibm.streamsx.kafka.serialization
> client.dns.lookup=use_all_dns_ips
> metadata.max.age.ms=2000
> enable.auto.commit=false
> group.instance.id=myUniqId[0]
> max.poll.interval.ms=30
> group.id=consumer-0
> metric.reporters=com.ibm.streamsx.kafka.clients.consum...
> reconnect.backoff.max.ms=1
> bootstrap.servers=localhost:9092
> max.poll.records=50
> session.timeout.ms=2
> client.id=C-J37-ReceivedMessages[0]
> allow.auto.create.topics=false
> metrics.sample.window.ms=1
> retry.backoff.ms=500
> reconnect.backoff.ms=250{code}
> *Expected behaviour:* throw exception indicating that something is wrong with 
> the chosen group.instance.id.
> The documentation does not tell anything about valid values for 
> group.instance.id.
> *Reproduce:*
>  
>  
> {code:java}
>  
> import java.util.Properties;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> public class Main {
> public static void main(String[] args) {
> Properties props = new Properties();
> props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put (ConsumerConfig.GROUP_ID_CONFIG, "group-Id1");
> props.put (ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "myUniqId[0]");
> props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.StringDeserializer");
> KafkaConsumer c = new KafkaConsumer (props);
> }
> }
> Exception in thread "main" java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644)
>   at Main.main(Main.java:15)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition

2019-10-31 Thread amuthan Ganeshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964526#comment-16964526
 ] 

amuthan Ganeshan commented on KAFKA-9073:
-

I am more interested in understanding the root cause, [~guozhang] could you 
please explain the scenario. Also, is this bug fix going to be a minor release 
under 2.3 ?

> Kafka Streams State stuck in rebalancing after one of the StreamThread 
> encounters java.lang.IllegalStateException: No current assignment for 
> partition
> --
>
> Key: KAFKA-9073
> URL: https://issues.apache.org/jira/browse/KAFKA-9073
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: amuthan Ganeshan
>Priority: Major
> Fix For: 2.4.0
>
> Attachments: KAFKA-9073.log
>
>
> I have a Kafka stream application that stores the incoming messages into a 
> state store, and later during the punctuation period, we store them into a 
> big data persistent store after processing the messages.
> The application consumes from 120 partitions distributed across 40 instances. 
> The application has been running fine without any problem for months, but all 
> of a sudden some of the instances failed because of a stream thread exception 
> saying  
> ```java.lang.IllegalStateException: No current assignment for partition 
> --changelog-98```
>  
> And other instances are stuck in the REBALANCING state, and never comes out 
> of it. Here is the full stack trace, I just masked the application-specific 
> app name and store name in the stack trace due to NDA.
>  
> ```
> 2019-10-21 13:27:13,481 ERROR 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> Encountered the following error during processing:
> java.lang.IllegalStateException: No current assignment for partition 
> application.id-store_name-changelog-98
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> ```
>  
> Now I checked the state sore disk usage; it is less than 40% of the total 
> disk space available. Restarting the application solves the problem for a 
> short amount of time, but the error popping up randomly on some other 
> instances quickly. I tried to change the retry and retry.backoff.ms 
> configuration but not helpful at all
> ```
> retries = 2147483647
> retry.backoff.ms
> ```
> After googling for some time I found there was a similar bug reported to the 
> Kafka team in the past, and also notice my stack trace is exactly matching 
> with 

[jira] [Commented] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition

2019-10-31 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964486#comment-16964486
 ] 

ASF GitHub Bot commented on KAFKA-9073:
---

guozhangwang commented on pull request #7630: KAFKA-9073: check assignment in 
requestFailed to avoid NPE
URL: https://github.com/apache/kafka/pull/7630
 
 
   This is a cherry-pick of the bug-fix included in 
https://github.com/apache/kafka/pull/6884 to 2.3 and older branch.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Streams State stuck in rebalancing after one of the StreamThread 
> encounters java.lang.IllegalStateException: No current assignment for 
> partition
> --
>
> Key: KAFKA-9073
> URL: https://issues.apache.org/jira/browse/KAFKA-9073
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: amuthan Ganeshan
>Priority: Major
> Fix For: 2.4.0
>
> Attachments: KAFKA-9073.log
>
>
> I have a Kafka stream application that stores the incoming messages into a 
> state store, and later during the punctuation period, we store them into a 
> big data persistent store after processing the messages.
> The application consumes from 120 partitions distributed across 40 instances. 
> The application has been running fine without any problem for months, but all 
> of a sudden some of the instances failed because of a stream thread exception 
> saying  
> ```java.lang.IllegalStateException: No current assignment for partition 
> --changelog-98```
>  
> And other instances are stuck in the REBALANCING state, and never comes out 
> of it. Here is the full stack trace, I just masked the application-specific 
> app name and store name in the stack trace due to NDA.
>  
> ```
> 2019-10-21 13:27:13,481 ERROR 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> Encountered the following error during processing:
> java.lang.IllegalStateException: No current assignment for partition 
> application.id-store_name-changelog-98
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>  at 
> 

[jira] [Commented] (KAFKA-8972) KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback state

2019-10-31 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964469#comment-16964469
 ] 

ASF GitHub Bot commented on KAFKA-8972:
---

guozhangwang commented on pull request #7617: KAFKA-8972 (2.4 blocker): bug fix 
for restoring task
URL: https://github.com/apache/kafka/pull/7617
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback 
> state
> 
>
> Key: KAFKA-8972
> URL: https://issues.apache.org/jira/browse/KAFKA-8972
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Blocker
> Fix For: 2.4.0
>
>
> Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the 
> following:
> {code}
> this.subscriptions.unsubscribe();
> this.coordinator.onLeavePrepare();
> this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
> {code}
> And inside {{onLeavePrepare}} we would look into the assignment and try to 
> revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}}, 
> and then clear the assignment.
> However, the subscription's assignment is already cleared in 
> {{this.subscriptions.unsubscribe();}} which means user's rebalance listener 
> would never be triggered. In other words, from consumer client's pov nothing 
> is owned after unsubscribe, but from the user caller's pov the partitions are 
> not revoked yet. For callers like Kafka Streams which rely on the rebalance 
> listener to maintain their internal state, this leads to inconsistent state 
> management and failure cases.
> Before KIP-429 this issue is hidden away since every time the consumer 
> re-joins the group later, it would still revoke everything anyways regardless 
> of the passed-in parameters of the rebalance listener; with KIP-429 this is 
> easier to reproduce now.
> I think we can summarize our fix as:
> • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then 
> `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks 
> are all closed as revoked by then.
> • [Optimization] If the generation is reset due to fatal error from join / hb 
> response etc, then we know that all partitions are lost, and we should not 
> trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside 
> `onLeavePrepare`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9113) Clean up task management

2019-10-31 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964468#comment-16964468
 ] 

Guozhang Wang commented on KAFKA-9113:
--

Also related to the above call: we should refactor the {{StreamTask.close}} 
function to be more robust to just close whatever is needed for a restoring 
task, or suspended, etc. More specifically we consider a task be:

1. created: topology not initialized, states not initialized. -> when close, 
basically nothing needs to be done
2. restoring: topology not initialized, states initialized. -> when close, only 
need to close the state manager
3. running: topology initialized, states initialized. -> when close, first 
suspend (close topology), and then close suspended
4. suspended: topology closed, states not closed. -> it is similar to 
restoring: we just call closeSuspended which only need to close the state 
manager

> Clean up task management
> 
>
> Key: KAFKA-9113
> URL: https://issues.apache.org/jira/browse/KAFKA-9113
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Along KIP-429 we did a lot of refactoring of the task management classes, 
> including the TaskManager and AssignedTasks (and children).  While hopefully 
> easier to reason about there's still significant opportunity for further 
> cleanup including safer state tracking.  Some potential improvements:
> 1) Verify that no tasks are ever in more than one state at once. One 
> possibility is to just check that the suspended, created, restoring, and 
> running maps are all disjoint, but this begs the question of when and where 
> to do those checks, and how often. Another idea might be to put all tasks 
> into a single map and just track their state on a per-task basis. Whatever it 
> is should be aware that some methods are on the critical code path, and 
> should not be burdened with excessive safety checks (ie 
> AssignedStreamTasks#process). Alternatively, it seems to make sense to just 
> make each state its own type. We can then do some cleanup of the AbstractTask 
> and StreamTask classes, which currently contain a number of methods specific 
> to only one type/state of task. For example
>  * only active running tasks ever need to be suspendable, yet every task does 
> through suspend then closeSuspended during close.
>  * as the name suggests, closeSuspended should technically only ever apply to 
> suspended tasks
>  * the code paths needed to perform certain actions such as closing or 
> committing a task vary widely between the different states. A restoring task 
> need only close its state manager, but skipping the task.close call and 
> calling only closeStateManager has lead to confusion and time wasted trying 
> to remember or ask someone why that is sufficient
> 2) Cleanup of closing and/or shutdown logic – there are some potential 
> improvements to be made here as well, for example AssignedTasks currently 
> implements a closeZombieTask method despite the fact that standby tasks are 
> never zombies. 
> 3)  The StoreChangelogReader also interacts with (only) the 
> AssignedStreamsTasks class, through the TaskManager. It can be difficult to 
> reason about these interactions and the state of the changelog reader.
> 4) All 4 classes and their state have very strict consistency requirements 
> that currently are almost impossible to verify, which has already resulted in 
> several bugs that we were lucky to catch in time. We should tighten up how 
> these classes manage their own state, and how the overall state is managed 
> between them, so that it is easy to make changes without introducing new bugs 
> because one class updated its own state without knowing it needed to tell 
> another class to also update its



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9113) Clean up task management

2019-10-31 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964462#comment-16964462
 ] 

Guozhang Wang commented on KAFKA-9113:
--

Dumping my thoughts here: for closing as zombies:

created : call close
restoring : call closeStateMgr
running : call close
suspended : call closeSuspended
3 and 4 makes sense to me, but 1/2 are a bit weird: for created tasks, we do 
not initialize topology nor state managers, but still we call close, whereas 
restoring ones (where we initialized state manager but not topology) we call 
closeStateMgr.

Did a closer look at the code base, I think the reason is that in 
closeNonRunningTasks (which is triggered in onPartitionsRevoked) we actually 
trigger the following:

task.close(false /* clean */, false /* isZombie */)
I.e. we treat it as an "unclean" close, and hence we won't write checkpoint, 
and most importantly, when closeTopology throws (which would be the case since 
it is not initialized at all) we would just ignore it. So "accidentally" this 
becomes correct.

This is not a great pattern, and I like @ableegoldman 's suggestion that we 
should consider making "close" call to be more robust in a follow-up PR, for 
now let's stay with what it is then.

> Clean up task management
> 
>
> Key: KAFKA-9113
> URL: https://issues.apache.org/jira/browse/KAFKA-9113
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Along KIP-429 we did a lot of refactoring of the task management classes, 
> including the TaskManager and AssignedTasks (and children).  While hopefully 
> easier to reason about there's still significant opportunity for further 
> cleanup including safer state tracking.  Some potential improvements:
> 1) Verify that no tasks are ever in more than one state at once. One 
> possibility is to just check that the suspended, created, restoring, and 
> running maps are all disjoint, but this begs the question of when and where 
> to do those checks, and how often. Another idea might be to put all tasks 
> into a single map and just track their state on a per-task basis. Whatever it 
> is should be aware that some methods are on the critical code path, and 
> should not be burdened with excessive safety checks (ie 
> AssignedStreamTasks#process). Alternatively, it seems to make sense to just 
> make each state its own type. We can then do some cleanup of the AbstractTask 
> and StreamTask classes, which currently contain a number of methods specific 
> to only one type/state of task. For example
>  * only active running tasks ever need to be suspendable, yet every task does 
> through suspend then closeSuspended during close.
>  * as the name suggests, closeSuspended should technically only ever apply to 
> suspended tasks
>  * the code paths needed to perform certain actions such as closing or 
> committing a task vary widely between the different states. A restoring task 
> need only close its state manager, but skipping the task.close call and 
> calling only closeStateManager has lead to confusion and time wasted trying 
> to remember or ask someone why that is sufficient
> 2) Cleanup of closing and/or shutdown logic – there are some potential 
> improvements to be made here as well, for example AssignedTasks currently 
> implements a closeZombieTask method despite the fact that standby tasks are 
> never zombies. 
> 3)  The StoreChangelogReader also interacts with (only) the 
> AssignedStreamsTasks class, through the TaskManager. It can be difficult to 
> reason about these interactions and the state of the changelog reader.
> 4) All 4 classes and their state have very strict consistency requirements 
> that currently are almost impossible to verify, which has already resulted in 
> several bugs that we were lucky to catch in time. We should tighten up how 
> these classes manage their own state, and how the overall state is managed 
> between them, so that it is easy to make changes without introducing new bugs 
> because one class updated its own state without knowing it needed to tell 
> another class to also update its



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition

2019-10-31 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9073.
--
Resolution: Fixed

> Kafka Streams State stuck in rebalancing after one of the StreamThread 
> encounters java.lang.IllegalStateException: No current assignment for 
> partition
> --
>
> Key: KAFKA-9073
> URL: https://issues.apache.org/jira/browse/KAFKA-9073
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: amuthan Ganeshan
>Priority: Major
> Fix For: 2.4.0
>
> Attachments: KAFKA-9073.log
>
>
> I have a Kafka stream application that stores the incoming messages into a 
> state store, and later during the punctuation period, we store them into a 
> big data persistent store after processing the messages.
> The application consumes from 120 partitions distributed across 40 instances. 
> The application has been running fine without any problem for months, but all 
> of a sudden some of the instances failed because of a stream thread exception 
> saying  
> ```java.lang.IllegalStateException: No current assignment for partition 
> --changelog-98```
>  
> And other instances are stuck in the REBALANCING state, and never comes out 
> of it. Here is the full stack trace, I just masked the application-specific 
> app name and store name in the stack trace due to NDA.
>  
> ```
> 2019-10-21 13:27:13,481 ERROR 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> Encountered the following error during processing:
> java.lang.IllegalStateException: No current assignment for partition 
> application.id-store_name-changelog-98
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> ```
>  
> Now I checked the state sore disk usage; it is less than 40% of the total 
> disk space available. Restarting the application solves the problem for a 
> short amount of time, but the error popping up randomly on some other 
> instances quickly. I tried to change the retry and retry.backoff.ms 
> configuration but not helpful at all
> ```
> retries = 2147483647
> retry.backoff.ms
> ```
> After googling for some time I found there was a similar bug reported to the 
> Kafka team in the past, and also notice my stack trace is exactly matching 
> with the stack trace of the reported bug.
> Here is the link for the bug reported on a comparable basis a year ago.
> https://issues.apache.org/jira/browse/KAFKA-7181
>  
> Now I am wondering is 

[jira] [Commented] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition

2019-10-31 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964437#comment-16964437
 ] 

Guozhang Wang commented on KAFKA-9073:
--

We can resolve this as fixed in 2.4.0 only for now, once I have other PR merged 
in I will update the ticket.

> Kafka Streams State stuck in rebalancing after one of the StreamThread 
> encounters java.lang.IllegalStateException: No current assignment for 
> partition
> --
>
> Key: KAFKA-9073
> URL: https://issues.apache.org/jira/browse/KAFKA-9073
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: amuthan Ganeshan
>Priority: Major
> Fix For: 2.4.0
>
> Attachments: KAFKA-9073.log
>
>
> I have a Kafka stream application that stores the incoming messages into a 
> state store, and later during the punctuation period, we store them into a 
> big data persistent store after processing the messages.
> The application consumes from 120 partitions distributed across 40 instances. 
> The application has been running fine without any problem for months, but all 
> of a sudden some of the instances failed because of a stream thread exception 
> saying  
> ```java.lang.IllegalStateException: No current assignment for partition 
> --changelog-98```
>  
> And other instances are stuck in the REBALANCING state, and never comes out 
> of it. Here is the full stack trace, I just masked the application-specific 
> app name and store name in the stack trace due to NDA.
>  
> ```
> 2019-10-21 13:27:13,481 ERROR 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> Encountered the following error during processing:
> java.lang.IllegalStateException: No current assignment for partition 
> application.id-store_name-changelog-98
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> ```
>  
> Now I checked the state sore disk usage; it is less than 40% of the total 
> disk space available. Restarting the application solves the problem for a 
> short amount of time, but the error popping up randomly on some other 
> instances quickly. I tried to change the retry and retry.backoff.ms 
> configuration but not helpful at all
> ```
> retries = 2147483647
> retry.backoff.ms
> ```
> After googling for some time I found there was a similar bug reported to the 
> Kafka team in the past, and also notice my stack trace is exactly matching 
> with the stack trace of the reported bug.
> Here is the link for the 

[jira] [Commented] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition

2019-10-31 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964436#comment-16964436
 ] 

Matthias J. Sax commented on KAFKA-9073:


[~simplyamuthan] [~ableegoldman] [~guozhang] -- should we close this ticket as 
fixed?

> Kafka Streams State stuck in rebalancing after one of the StreamThread 
> encounters java.lang.IllegalStateException: No current assignment for 
> partition
> --
>
> Key: KAFKA-9073
> URL: https://issues.apache.org/jira/browse/KAFKA-9073
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: amuthan Ganeshan
>Priority: Major
> Fix For: 2.4.0
>
> Attachments: KAFKA-9073.log
>
>
> I have a Kafka stream application that stores the incoming messages into a 
> state store, and later during the punctuation period, we store them into a 
> big data persistent store after processing the messages.
> The application consumes from 120 partitions distributed across 40 instances. 
> The application has been running fine without any problem for months, but all 
> of a sudden some of the instances failed because of a stream thread exception 
> saying  
> ```java.lang.IllegalStateException: No current assignment for partition 
> --changelog-98```
>  
> And other instances are stuck in the REBALANCING state, and never comes out 
> of it. Here is the full stack trace, I just masked the application-specific 
> app name and store name in the stack trace due to NDA.
>  
> ```
> 2019-10-21 13:27:13,481 ERROR 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> Encountered the following error during processing:
> java.lang.IllegalStateException: No current assignment for partition 
> application.id-store_name-changelog-98
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> ```
>  
> Now I checked the state sore disk usage; it is less than 40% of the total 
> disk space available. Restarting the application solves the problem for a 
> short amount of time, but the error popping up randomly on some other 
> instances quickly. I tried to change the retry and retry.backoff.ms 
> configuration but not helpful at all
> ```
> retries = 2147483647
> retry.backoff.ms
> ```
> After googling for some time I found there was a similar bug reported to the 
> Kafka team in the past, and also notice my stack trace is exactly matching 
> with the stack trace of the reported bug.
> Here is the link for the bug reported on a 

[jira] [Commented] (KAFKA-9088) Consolidate InternalMockProcessorContext and MockInternalProcessorContext

2019-10-31 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964422#comment-16964422
 ] 

Sophie Blee-Goldman commented on KAFKA-9088:


It's still pretty awkward to make users initialize stores when unit testing, 
especially since we explicitly forbid and prevent them from doing so in "real" 
code. But I've been thinking, regardless of how we want to solve the store unit 
testing problem we should go ahead and put the new MockInternalProcessorContext 
(or whatever it'll be called) in the test-utils package. It doesn't seem that 
unlikely we might want or need it for something else in the future, given how 
ubiquitous  the processor context is

> Consolidate InternalMockProcessorContext and MockInternalProcessorContext
> -
>
> Key: KAFKA-9088
> URL: https://issues.apache.org/jira/browse/KAFKA-9088
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Bruno Cadonna
>Priority: Minor
>  Labels: newbie
>
> Currently, we have two mocks for the {{InternalProcessorContext}}. The goal 
> of this ticket is to merge both into one mock or replace it with an 
> {{EasyMock}} mock. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9072) Add Section to Streams Developer Guide for Topology Naming (KIP-307)

2019-10-31 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964423#comment-16964423
 ] 

ASF GitHub Bot commented on KAFKA-9072:
---

bbejeck commented on pull request #7629: KAFKA-9072: Add Topology naming to the 
dev guide
URL: https://github.com/apache/kafka/pull/7629
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Section to Streams Developer Guide for Topology Naming (KIP-307)
> 
>
> Key: KAFKA-9072
> URL: https://issues.apache.org/jira/browse/KAFKA-9072
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>  Labels: docs
> Fix For: 2.5.0
>
>
> WIth KIP-307 users can name operators in a topology.  Naming is important as 
> it can help with pinning state store, changelog topic, and repartition topic 
> names keeping the topology robust in the face of adding/removing operators in 
> a Kafka Streams DSL. We should add a section to the developer guide to 
> explain why this is important.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9113) Clean up task management

2019-10-31 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9113:
---
Description: 
Along KIP-429 we did a lot of refactoring of the task management classes, 
including the TaskManager and AssignedTasks (and children).  While hopefully 
easier to reason about there's still significant opportunity for further 
cleanup including safer state tracking.  Some potential improvements:

1) Verify that no tasks are ever in more than one state at once. One 
possibility is to just check that the suspended, created, restoring, and 
running maps are all disjoint, but this begs the question of when and where to 
do those checks, and how often. Another idea might be to put all tasks into a 
single map and just track their state on a per-task basis. Whatever it is 
should be aware that some methods are on the critical code path, and should not 
be burdened with excessive safety checks (ie AssignedStreamTasks#process). 
Alternatively, it seems to make sense to just make each state its own type. We 
can then do some cleanup of the AbstractTask and StreamTask classes, which 
currently contain a number of methods specific to only one type/state of task. 
For example
 * only active running tasks ever need to be suspendable, yet every task does 
through suspend then closeSuspended during close.
 * as the name suggests, closeSuspended should technically only ever apply to 
suspended tasks
 * the code paths needed to perform certain actions such as closing or 
committing a task vary widely between the different states. A restoring task 
need only close its state manager, but skipping the task.close call and calling 
only closeStateManager has lead to confusion and time wasted trying to remember 
or ask someone why that is sufficient

2) Cleanup of closing and/or shutdown logic – there are some potential 
improvements to be made here as well, for example AssignedTasks currently 
implements a closeZombieTask method despite the fact that standby tasks are 
never zombies. 

3)  The StoreChangelogReader also interacts with (only) the 
AssignedStreamsTasks class, through the TaskManager. It can be difficult to 
reason about these interactions and the state of the changelog reader.

4) All 4 classes and their state have very strict consistency requirements that 
currently are almost impossible to verify, which has already resulted in 
several bugs that we were lucky to catch in time. We should tighten up how 
these classes manage their own state, and how the overall state is managed 
between them, so that it is easy to make changes without introducing new bugs 
because one class updated its own state without knowing it needed to tell 
another class to also update its

  was:
Along KIP-429 we did a lot of refactoring of the task management classes, 
including the TaskManager and AssignedTasks (and children).  While hopefully 
easier to reason about there's still significant opportunity for further 
cleanup including safer state tracking.  Some potential improvements:

1) Verify that no tasks are ever in more than one state at once. One 
possibility is to just check that the suspended, created, restoring, and 
running maps are all disjoint, but this begs the question of when and where to 
do those checks, and how often. Another idea might be to put all tasks into a 
single map and just track their state on a per-task basis. Whatever it is 
should be aware that some methods are on the critical code path, and should not 
be burdened with excessive safety checks (ie AssignedStreamTasks#process)

2) Cleanup of closing and/or shutdown logic – there are some potential 
improvements to be made here as well, for example AssignedTasks currently 
implements a closeZombieTask method despite the fact that standby tasks are 
never zombies. 

3)  The StoreChangelogReader also interacts with (only) the 
AssignedStreamsTasks class, through the TaskManager. It can be difficult to 
reason about these interactions and the state of the changelog reader.

4) All 4 classes and their state have very strict consistency requirements that 
currently are almost impossible to verify, which has already resulted in 
several bugs that we were lucky to catch in time. We should tighten up how 
these classes manage their own state, and how the overall state is managed 
between them, so that it is easy to make changes without introducing new bugs 
because one class updated its own state without knowing it needed to tell 
another class to also update its


> Clean up task management
> 
>
> Key: KAFKA-9113
> URL: https://issues.apache.org/jira/browse/KAFKA-9113
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>   

[jira] [Commented] (KAFKA-9080) System Test Failure: MessageFormatChangeTest.testCompatibilty

2019-10-31 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964417#comment-16964417
 ] 

ASF GitHub Bot commented on KAFKA-9080:
---

tuvtran commented on pull request #7628: KAFKA-9080: Addresses 
MessageFormatChangeTest.testCompatibilty with version 0.9.0.1
URL: https://github.com/apache/kafka/pull/7628
 
 
   TODO
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> System Test Failure: MessageFormatChangeTest.testCompatibilty
> -
>
> Key: KAFKA-9080
> URL: https://issues.apache.org/jira/browse/KAFKA-9080
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Manikumar
>Assignee: Tu Tran
>Priority: Blocker
> Fix For: 2.4.0
>
>
> MessageFormatChangeTest tests are failing on 2.4 and trunk for 0.9.0.1 
> version.
> http://confluent-kafka-2-4-system-test-results.s3-us-west-2.amazonaws.com/2019-10-21--001.1571716233--confluentinc--2.4--cb4944f/report.html
> {code}
> Module: kafkatest.tests.client.message_format_change_test
> Class:  MessageFormatChangeTest
> Method: test_compatibility
> Arguments:
> {
>   "consumer_version": "0.9.0.1",
>   "producer_version": "0.9.0.1"
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8737) TaskMigrated Exception while rebalancing kafka streams

2019-10-31 Thread KUMAR (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964410#comment-16964410
 ] 

KUMAR commented on KAFKA-8737:
--

Hi Bill Is there a plan to fix this issue similar to earlier fix for  KIP-6269?

> TaskMigrated Exception while rebalancing kafka streams
> --
>
> Key: KAFKA-8737
> URL: https://issues.apache.org/jira/browse/KAFKA-8737
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0, 1.0.1
> Environment: 20 partitions 
> 1 topic 
> 8 Streamer service 
> topic-region-1 9  7841726 8236017 
> 394291 
> streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer-0276e83d-40b5-4b44-b764-7d29e0dab663/
>  
> streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer
> topic-region-1 15 7421710 7467666 
> 45956  
> streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer-0276e83d-40b5-4b44-b764-7d29e0dab663/
>  
> streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer
> topic-region-1 19 7737360 8120611 
> 383251 
> streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer-0276e83d-40b5-4b44-b764-7d29e0dab663/
> 
> streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer
> topic-region-1
>Reporter: KUMAR
>Assignee: Bill Bejeck
>Priority: Major
>
> Kafka  streams throws following exception while restart of a stream client 
> service - 
> o.a.k.s.p.internals.StreamThread.? - stream-thread 
> [streams-subscriberstopic-region-1-32d968e3-f892-4772-a7a4-6f684d7e43c9-StreamThread-1]
>  Detected a task that got migrated to another thread. This implies that this 
> thread missed a rebalance and dropped out of the consumer group. Trying to 
> rejoin the consumer group now.
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> topic-region-1-12 should not change while restoring: old end offset 6286727, 
> current offset 6380997
>  
> Kafka version is 1.0.0 and we have back merged the fix for KIP-6269-
> [https://github.com/apache/kafka/pull/4300/files#|https://github.com/apache/kafka/pull/4300/files]
> However we observe that there seems to be an issue in rebalance when 
> "auto.offset.reset" is configured as "latest". Based on log analysis we see 
> following behavior - 
>  # StreamThread starts a restore consumer 
>  # While Fetching it gets offset out of range                               
> o.a.k.c.consumer.internals.Fetcher.? - [Consumer 
> clientId=streams-subscriberstopic-region-1-11b2d7fb-11ce-4b0b-a40a-388d3c7b6bc9-StreamThread-1-restore-
>  consumer, groupId=] Fetch READ_UNCOMMITTED at offset 246431 for partition 
> topic-region-1-12 returned fetch data (error=OFFSET_OUT_OF_RANGE, 
> highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1,
>  abortedTransactions = null, recordsSizeInBytes=0) 
>  # Fetcher tries to reset the offset 
>  # While reset the offset it appears it is changing the offset position and 
> causing TaskMigrated exception
> Above test repeated with "auto.offset.reset" is configured as "earliest" does 
> not throw any TaskMigrated exception as in earliest case we are not reseting 
> the restore consumer position.
>  
> Please let us know if this is possible and if a fix would be needed for the 
> offset reset piece when set to latest.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8558) KIP-479 - Add StreamJoined Overload to KStream#Join

2019-10-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8558:
---
Description: 
To prevent a topology incompatibility with the release of 2.4 and the naming of 
Join operations we'll add an overloaded KStream#join method accepting a 
Materialized parameter. This will allow users to explicitly name state stores 
created by Kafka Streams in the join operation.

The overloads will apply to all flavors of KStream#join (inner, left, and 
right). 

KIP-479: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join

  was:
To prevent a topology incompatibility with the release of 2.4 and the naming of 
Join operations we'll add an overloaded KStream#join method accepting a 
Materialized parameter. This will allow users to explicitly name state stores 
created by Kafka Streams in the join operation.

 

The overloads will apply to all flavors of KStream#join (inner, left, and 
right). 


> KIP-479 - Add StreamJoined Overload to KStream#Join 
> 
>
> Key: KAFKA-8558
> URL: https://issues.apache.org/jira/browse/KAFKA-8558
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Blocker
>  Labels: kip
> Fix For: 2.4.0
>
>
> To prevent a topology incompatibility with the release of 2.4 and the naming 
> of Join operations we'll add an overloaded KStream#join method accepting a 
> Materialized parameter. This will allow users to explicitly name state stores 
> created by Kafka Streams in the join operation.
> The overloads will apply to all flavors of KStream#join (inner, left, and 
> right). 
> KIP-479: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9126) Extend `StreamJoined` to allow more store configs

2019-10-31 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-9126:
--

 Summary: Extend `StreamJoined` to allow more store configs
 Key: KAFKA-9126
 URL: https://issues.apache.org/jira/browse/KAFKA-9126
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.4.0
Reporter: Matthias J. Sax


In 2.4.0 release, we introduced `StreamJoined` configuration object via KIP-479 
(KAFKA-8558). The idea of `StreamJoined` is to be an equivalent to 
`Materialized` but customized for stream-stream joines, that have two stores 
(in contrast to the usage of `Materialized` that is used for single-store 
operators).

During the KIP discussion, the idea to allow setting the store retention time 
and enable/disable changelogging for the store was discussed. However, at some 
point this idea was dropped not unknown reasons (seems it slipped).

We should consider to extend `StreamJoined` with `withRetentionPeriod()` and 
`loggingEnabled()`/`loggingDisabled()` methods to get feature parity to 
`Materialized`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8558) KIP-479 - Add StreamJoined Overload to KStream#Join

2019-10-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8558:
---
Labels: kip  (was: needs-kip)

> KIP-479 - Add StreamJoined Overload to KStream#Join 
> 
>
> Key: KAFKA-8558
> URL: https://issues.apache.org/jira/browse/KAFKA-8558
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Blocker
>  Labels: kip
> Fix For: 2.4.0
>
>
> To prevent a topology incompatibility with the release of 2.4 and the naming 
> of Join operations we'll add an overloaded KStream#join method accepting a 
> Materialized parameter. This will allow users to explicitly name state stores 
> created by Kafka Streams in the join operation.
>  
> The overloads will apply to all flavors of KStream#join (inner, left, and 
> right). 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8558) KIP-479 - Add StreamJoined Overload to KStream#Join

2019-10-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8558:
---
Summary: KIP-479 - Add StreamJoined Overload to KStream#Join   (was: 
KIP-479 - Add Materialized Overload to KStream#Join )

> KIP-479 - Add StreamJoined Overload to KStream#Join 
> 
>
> Key: KAFKA-8558
> URL: https://issues.apache.org/jira/browse/KAFKA-8558
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 2.4.0
>
>
> To prevent a topology incompatibility with the release of 2.4 and the naming 
> of Join operations we'll add an overloaded KStream#join method accepting a 
> Materialized parameter. This will allow users to explicitly name state stores 
> created by Kafka Streams in the join operation.
>  
> The overloads will apply to all flavors of KStream#join (inner, left, and 
> right). 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition

2019-10-31 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964218#comment-16964218
 ] 

Guozhang Wang commented on KAFKA-9073:
--

Thanks for the find-out, I can prepare a PR for 2.3 and try to cherry-pick to 
older branches :)

> Kafka Streams State stuck in rebalancing after one of the StreamThread 
> encounters java.lang.IllegalStateException: No current assignment for 
> partition
> --
>
> Key: KAFKA-9073
> URL: https://issues.apache.org/jira/browse/KAFKA-9073
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: amuthan Ganeshan
>Priority: Major
> Fix For: 2.4.0
>
> Attachments: KAFKA-9073.log
>
>
> I have a Kafka stream application that stores the incoming messages into a 
> state store, and later during the punctuation period, we store them into a 
> big data persistent store after processing the messages.
> The application consumes from 120 partitions distributed across 40 instances. 
> The application has been running fine without any problem for months, but all 
> of a sudden some of the instances failed because of a stream thread exception 
> saying  
> ```java.lang.IllegalStateException: No current assignment for partition 
> --changelog-98```
>  
> And other instances are stuck in the REBALANCING state, and never comes out 
> of it. Here is the full stack trace, I just masked the application-specific 
> app name and store name in the stack trace due to NDA.
>  
> ```
> 2019-10-21 13:27:13,481 ERROR 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> Encountered the following error during processing:
> java.lang.IllegalStateException: No current assignment for partition 
> application.id-store_name-changelog-98
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> ```
>  
> Now I checked the state sore disk usage; it is less than 40% of the total 
> disk space available. Restarting the application solves the problem for a 
> short amount of time, but the error popping up randomly on some other 
> instances quickly. I tried to change the retry and retry.backoff.ms 
> configuration but not helpful at all
> ```
> retries = 2147483647
> retry.backoff.ms
> ```
> After googling for some time I found there was a similar bug reported to the 
> Kafka team in the past, and also notice my stack trace is exactly matching 
> with the stack trace of the reported bug.
> Here is the link for the bug reported on 

[jira] [Created] (KAFKA-9125) GroupMetadataManager and TransactionStateManager should use metadata cache instead of zkClient

2019-10-31 Thread Viktor Somogyi-Vass (Jira)
Viktor Somogyi-Vass created KAFKA-9125:
--

 Summary: GroupMetadataManager and TransactionStateManager should 
use metadata cache instead of zkClient
 Key: KAFKA-9125
 URL: https://issues.apache.org/jira/browse/KAFKA-9125
 Project: Kafka
  Issue Type: Sub-task
Reporter: Viktor Somogyi-Vass


Both classes query their respective topic's partition count via the zkClient. 
This however could be queried by the broker's local metadata cache.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-9124) ISR changes should be propagated via Kafka protocol

2019-10-31 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass updated KAFKA-9124:
---
Comment: was deleted

(was: Opened by mistake.)

> ISR changes should be propagated via Kafka protocol
> ---
>
> Key: KAFKA-9124
> URL: https://issues.apache.org/jira/browse/KAFKA-9124
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Viktor Somogyi-Vass
>Priority: Major
>
> Currently {{Partition.expandIsr}} and {{Partition.shrinkIsr}} updates 
> Zookeeper which is listened by the controller and that's how it notices the 
> ISR changes and sends out metadata requests.
> Instead of this the brokers should use Kafka protocol messages to send out 
> ISR change notifications.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9124) ISR changes should be propagated via Kafka protocol

2019-10-31 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass updated KAFKA-9124:
---
Description: 
Currently {{Partition.expandIsr}} and {{Partition.shrinkIsr}} updates Zookeeper 
which is listened by the controller and that's how it notices the ISR changes 
and sends out metadata requests.
Instead of this the brokers should use Kafka protocol messages to send out ISR 
change notifications.

  was:
Currently {{Partition.expandIsr}} and {{Partition.shrinkIsr}} updates Zookeeper 
which is listened by the controller and that's how it notices the ISR changes 
and sends out metadata requests.
Instead of this the brokers should use Kafka protocol messages to send out isr 
change notifications.


> ISR changes should be propagated via Kafka protocol
> ---
>
> Key: KAFKA-9124
> URL: https://issues.apache.org/jira/browse/KAFKA-9124
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Viktor Somogyi-Vass
>Priority: Major
>
> Currently {{Partition.expandIsr}} and {{Partition.shrinkIsr}} updates 
> Zookeeper which is listened by the controller and that's how it notices the 
> ISR changes and sends out metadata requests.
> Instead of this the brokers should use Kafka protocol messages to send out 
> ISR change notifications.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9124) ISR changes should be propagated via Kafka protocol

2019-10-31 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass updated KAFKA-9124:
---
Description: 
Currently {{Partition.expandIsr}} and {{Partition.shrinkIsr}} updates Zookeeper 
which is listened by the controller and that's how it notices the ISR changes 
and sends out metadata requests.
Instead of this the brokers should use Kafka protocol messages to send out isr 
change notifications.

  was:Currently {{KafkaApis}} invokes the {{adminManager}} (which esentially 
wraps the zkClient) to create a topic. Instead of this we should create a 
protocol that sends a request to the broker and listens for confirmation.


> ISR changes should be propagated via Kafka protocol
> ---
>
> Key: KAFKA-9124
> URL: https://issues.apache.org/jira/browse/KAFKA-9124
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Viktor Somogyi-Vass
>Priority: Major
>
> Currently {{Partition.expandIsr}} and {{Partition.shrinkIsr}} updates 
> Zookeeper which is listened by the controller and that's how it notices the 
> ISR changes and sends out metadata requests.
> Instead of this the brokers should use Kafka protocol messages to send out 
> isr change notifications.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-9124) Topic creation should be done by the controller

2019-10-31 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass reopened KAFKA-9124:


> Topic creation should be done by the controller
> ---
>
> Key: KAFKA-9124
> URL: https://issues.apache.org/jira/browse/KAFKA-9124
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Viktor Somogyi-Vass
>Priority: Major
>
> Currently {{KafkaApis}} invokes the {{adminManager}} (which esentially wraps 
> the zkClient) to create a topic. Instead of this we should create a protocol 
> that sends a request to the broker and listens for confirmation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9124) ISR changes should be propagated via Kafka protocol

2019-10-31 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass updated KAFKA-9124:
---
Summary: ISR changes should be propagated via Kafka protocol  (was: Topic 
creation should be done by the controller)

> ISR changes should be propagated via Kafka protocol
> ---
>
> Key: KAFKA-9124
> URL: https://issues.apache.org/jira/browse/KAFKA-9124
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Viktor Somogyi-Vass
>Priority: Major
>
> Currently {{KafkaApis}} invokes the {{adminManager}} (which esentially wraps 
> the zkClient) to create a topic. Instead of this we should create a protocol 
> that sends a request to the broker and listens for confirmation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9124) Topic creation should be done by the controller

2019-10-31 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass resolved KAFKA-9124.

Resolution: Invalid

Opened by mistake.

> Topic creation should be done by the controller
> ---
>
> Key: KAFKA-9124
> URL: https://issues.apache.org/jira/browse/KAFKA-9124
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Viktor Somogyi-Vass
>Priority: Major
>
> Currently {{KafkaApis}} invokes the {{adminManager}} (which esentially wraps 
> the zkClient) to create a topic. Instead of this we should create a protocol 
> that sends a request to the broker and listens for confirmation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9122) Externalizing DB password is not working

2019-10-31 Thread Dwijadas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964122#comment-16964122
 ] 

Dwijadas commented on KAFKA-9122:
-

Hi
After removing the quotes from the credential file, the connector managed to 
connect to the DB.

 

{{$ cat /home/kfk/data/ora_credentials.properties
ora.username=apps
ora.password=Passw0rd!}}

> Externalizing DB password is not working
> 
>
> Key: KAFKA-9122
> URL: https://issues.apache.org/jira/browse/KAFKA-9122
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.1
> Environment: CentOS 6.7
>Reporter: Dwijadas
>Priority: Trivial
> Attachments: Screenshot_1.png
>
>
> Hi
> I am trying to externalizing user name and password for oracle DB using 
> {{FileConfigProvider}} provider.
> For that i have created a properties file that contains user name and 
> password.
>  
> {{$ cat /home/kfk/data/ora_credentials.properties
> ora.username="apps"
> ora.password="Passw0rd!"}}
> Added the config providers as file and also the config.providers.file.class 
> as FileConfigProvider in the worker config:
>  
> {{$ cat /home/kfk/etc/kafka/connect-distributed.properties
> ...
> ...
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
> ...
> ...}}
> Restarted the worker and submitted a task using REST with the following config
>  
> {{"config": \{
>"connector.class": 
> "io.confluent.connect.jdbc.JdbcSourceConnector",
>"tasks.max": "1",
>  "connection.user": 
> "${file:/home/kfk/data/ora_credentials.properties:ora.username}",
>"connection.password": 
> "${file:/home/kfk/data/ora_credentials.properties:ora.password}",
>...
>...
> }}}
> Submitting the above task resulting in the following error:
>  
> {{{
>   "error_code": 400,
>   "message": "Connector configuration is invalid and contains the following 2 
> error(s):\nInvalid value java.sql.SQLException: ORA-01017: invalid 
> username/password; logon denied\n for configuration Couldn't open connection 
> to jdbc:oracle:thin:@oebsr122.infodetics.com:1521:VIS\nInvalid value 
> java.sql.SQLException: ORA-01017: invalid username/password; logon denied\n 
> for configuration Couldn't open connection to 
> jdbc:oracle:thin:@oebsr122.infodetics.com:1521:VIS\nYou can also find the 
> above list of errors at the endpoint `/\{connectorType}/config/validate`"
> }}}
> Assuming the above config does not replaces the user name and password at all 
> rather entire values for connection.user and connection.password are used to 
> connect to the DB resulting in ORA-01017: invalid username/password error.
> Is it a bug ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9122) Externalizing DB password is not working

2019-10-31 Thread Dwijadas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964095#comment-16964095
 ] 

Dwijadas commented on KAFKA-9122:
-

In the distributed connector log, i can see value of username is correctly 
replaced from the properties file and the password is hidden. But still there 
is an error code of 400 about invalid values ?

 

!Screenshot_1.png!

> Externalizing DB password is not working
> 
>
> Key: KAFKA-9122
> URL: https://issues.apache.org/jira/browse/KAFKA-9122
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.1
> Environment: CentOS 6.7
>Reporter: Dwijadas
>Priority: Trivial
> Attachments: Screenshot_1.png
>
>
> Hi
> I am trying to externalizing user name and password for oracle DB using 
> {{FileConfigProvider}} provider.
> For that i have created a properties file that contains user name and 
> password.
>  
> {{$ cat /home/kfk/data/ora_credentials.properties
> ora.username="apps"
> ora.password="Passw0rd!"}}
> Added the config providers as file and also the config.providers.file.class 
> as FileConfigProvider in the worker config:
>  
> {{$ cat /home/kfk/etc/kafka/connect-distributed.properties
> ...
> ...
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
> ...
> ...}}
> Restarted the worker and submitted a task using REST with the following config
>  
> {{"config": \{
>"connector.class": 
> "io.confluent.connect.jdbc.JdbcSourceConnector",
>"tasks.max": "1",
>  "connection.user": 
> "${file:/home/kfk/data/ora_credentials.properties:ora.username}",
>"connection.password": 
> "${file:/home/kfk/data/ora_credentials.properties:ora.password}",
>...
>...
> }}}
> Submitting the above task resulting in the following error:
>  
> {{{
>   "error_code": 400,
>   "message": "Connector configuration is invalid and contains the following 2 
> error(s):\nInvalid value java.sql.SQLException: ORA-01017: invalid 
> username/password; logon denied\n for configuration Couldn't open connection 
> to jdbc:oracle:thin:@oebsr122.infodetics.com:1521:VIS\nInvalid value 
> java.sql.SQLException: ORA-01017: invalid username/password; logon denied\n 
> for configuration Couldn't open connection to 
> jdbc:oracle:thin:@oebsr122.infodetics.com:1521:VIS\nYou can also find the 
> above list of errors at the endpoint `/\{connectorType}/config/validate`"
> }}}
> Assuming the above config does not replaces the user name and password at all 
> rather entire values for connection.user and connection.password are used to 
> connect to the DB resulting in ORA-01017: invalid username/password error.
> Is it a bug ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9122) Externalizing DB password is not working

2019-10-31 Thread Dwijadas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dwijadas updated KAFKA-9122:

Attachment: Screenshot_1.png

> Externalizing DB password is not working
> 
>
> Key: KAFKA-9122
> URL: https://issues.apache.org/jira/browse/KAFKA-9122
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.1
> Environment: CentOS 6.7
>Reporter: Dwijadas
>Priority: Trivial
> Attachments: Screenshot_1.png
>
>
> Hi
> I am trying to externalizing user name and password for oracle DB using 
> {{FileConfigProvider}} provider.
> For that i have created a properties file that contains user name and 
> password.
>  
> {{$ cat /home/kfk/data/ora_credentials.properties
> ora.username="apps"
> ora.password="Passw0rd!"}}
> Added the config providers as file and also the config.providers.file.class 
> as FileConfigProvider in the worker config:
>  
> {{$ cat /home/kfk/etc/kafka/connect-distributed.properties
> ...
> ...
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
> ...
> ...}}
> Restarted the worker and submitted a task using REST with the following config
>  
> {{"config": \{
>"connector.class": 
> "io.confluent.connect.jdbc.JdbcSourceConnector",
>"tasks.max": "1",
>  "connection.user": 
> "${file:/home/kfk/data/ora_credentials.properties:ora.username}",
>"connection.password": 
> "${file:/home/kfk/data/ora_credentials.properties:ora.password}",
>...
>...
> }}}
> Submitting the above task resulting in the following error:
>  
> {{{
>   "error_code": 400,
>   "message": "Connector configuration is invalid and contains the following 2 
> error(s):\nInvalid value java.sql.SQLException: ORA-01017: invalid 
> username/password; logon denied\n for configuration Couldn't open connection 
> to jdbc:oracle:thin:@oebsr122.infodetics.com:1521:VIS\nInvalid value 
> java.sql.SQLException: ORA-01017: invalid username/password; logon denied\n 
> for configuration Couldn't open connection to 
> jdbc:oracle:thin:@oebsr122.infodetics.com:1521:VIS\nYou can also find the 
> above list of errors at the endpoint `/\{connectorType}/config/validate`"
> }}}
> Assuming the above config does not replaces the user name and password at all 
> rather entire values for connection.user and connection.password are used to 
> connect to the DB resulting in ORA-01017: invalid username/password error.
> Is it a bug ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9121) Mirror Maker 2.0 doesn't handle the topic names in consumer checkpoints properly when topic name contain separator

2019-10-31 Thread Jakub Scholz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964089#comment-16964089
 ] 

Jakub Scholz commented on KAFKA-9121:
-

I wondered if the alias of the source/target cluster alias can be used to 
remove the prefix more elegantly instead of the current implementation which 
requires the separator to be unique. But TBH I don't know enough how MM2 works 
when you chain multiple clusters so I'm not sure it would be as easy as when I 
played with it with just two Kafka clusters.

> Mirror Maker 2.0 doesn't handle the topic names in consumer checkpoints 
> properly when topic name contain separator
> --
>
> Key: KAFKA-9121
> URL: https://issues.apache.org/jira/browse/KAFKA-9121
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Jakub Scholz
>Priority: Major
>
> I was trying the Kafka Mirror Maker 2.0 and run into the following situation:
> 1) I have 2 Kafka clusters with topic {{kafka-test-apps}} topic
> 2) I configured Mirror Maker with {{replication.policy.separator=-}} and with 
> mirroring between cluster {{a}} and {{b}}.
> 3) When running Mirror Maker the mirroring of topics works fine. But when I 
> use the {{RemoteClusterUtils}} to recover the offsets, the names of the 
> topics for which the offsets are found are {{a-kafka-test-apps}} and 
> {{apps}}. While the expected topic names would be {{a-kafka-test-apps}} and 
> {{kafka-test-apps}}.
> I tried to find the issue, but didn't found it so far. But it doesn't seem to 
> be in {{RemoteClusterUtils}} because the topic names seem to be wrong already 
> in {{checkpoints.internal}} topic. So it is probably already processed in the 
> wrong way in the source cluster. 
> When I use {{.}} as the separator, it seems to work fine for me. It looks 
> like the problem is only when the topci names contain already the separator 
> in the original topic name. But using the right separator might not be a 
> solution for this, because you migth have topics with different characters 
> and always have this problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9120) Convert ./bin/kafka-reassign-partitions.sh to use KIP-455

2019-10-31 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964077#comment-16964077
 ] 

Stanislav Kozlovski commented on KAFKA-9120:


This is tracked in https://issues.apache.org/jira/browse/KAFKA-8820

> Convert ./bin/kafka-reassign-partitions.sh to use KIP-455
> -
>
> Key: KAFKA-9120
> URL: https://issues.apache.org/jira/browse/KAFKA-9120
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Colin McCabe
>Priority: Major
>
> We should convert ./bin/kafka-reassign-partitions.sh to use the KIP-455 API.  
> The KIP was already approved and the server-side work is done.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9123) Add system test with large number of partitions

2019-10-31 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964017#comment-16964017
 ] 

ASF GitHub Bot commented on KAFKA-9123:
---

mumrah commented on pull request #7621: KAFKA-9123 Test a large number of 
replicas
URL: https://github.com/apache/kafka/pull/7621
 
 
   TBD
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add system test with large number of partitions
> ---
>
> Key: KAFKA-9123
> URL: https://issues.apache.org/jira/browse/KAFKA-9123
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 2.5.0
>
>
> Let's add a system test with several thousand replicas and do some validation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9123) Add system test with large number of partitions

2019-10-31 Thread David Arthur (Jira)
David Arthur created KAFKA-9123:
---

 Summary: Add system test with large number of partitions
 Key: KAFKA-9123
 URL: https://issues.apache.org/jira/browse/KAFKA-9123
 Project: Kafka
  Issue Type: Test
  Components: system tests
Reporter: David Arthur
Assignee: David Arthur
 Fix For: 2.5.0


Let's add a system test with several thousand replicas and do some validation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6542) Tables should trigger joins too, not just streams

2019-10-31 Thread Chad Preisler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16963970#comment-16963970
 ] 

Chad Preisler commented on KAFKA-6542:
--

[~mjsax] Thank you for the clarification. I was looking at the issue only from 
the perspective of bootstrapping the kTables on startup. The code appears to 
only get to a State.Running when all kTable partitions have been restored. It 
also looks like the partitions are only "restored" when they reach the 
endOffsets that were set when the application started. 

> Tables should trigger joins too, not just streams
> -
>
> Key: KAFKA-6542
> URL: https://issues.apache.org/jira/browse/KAFKA-6542
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Antony Stubbs
>Priority: Major
>
> At the moment it's quite possible to have a race condition when joining a 
> stream with a table, if the stream event arrives first, before the table 
> event, in which case the join will fail.
> This is also related to bootstrapping KTables (which is what a GKTable does).
> Related to: KAFKA-4113 Allow KTable bootstrap



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9034) kafka-run-class.sh will fail if JAVA_HOME has space

2019-10-31 Thread Xu JianHai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16963923#comment-16963923
 ] 

Xu JianHai commented on KAFKA-9034:
---

new intellij java use "_" replace wiith blank space.

> kafka-run-class.sh will fail if JAVA_HOME has space
> ---
>
> Key: KAFKA-9034
> URL: https://issues.apache.org/jira/browse/KAFKA-9034
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
> Environment: macOS
>Reporter: Fenjin Wang
>Priority: Minor
>  Labels: easyfix, newbie
>
> If set JAVA_HOME to IntelliJ's java, bin/zookeeper-server-start.sh can't work 
> because the path has space in it.
>  
> {quote}export JAVA_HOME="/Applications/IntelliJ 
> IDEA.app/Contents/jbr/Contents/Home/"
> {quote}
>  
> We can fix this by quote "$JAVA" in the shell script according to: 
> [https://stackoverflow.com/a/7740746/1203241]
>  
> I can send a PR if you like.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9034) kafka-run-class.sh will fail if JAVA_HOME has space

2019-10-31 Thread Xu JianHai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xu JianHai reassigned KAFKA-9034:
-

Assignee: (was: Xu JianHai)

> kafka-run-class.sh will fail if JAVA_HOME has space
> ---
>
> Key: KAFKA-9034
> URL: https://issues.apache.org/jira/browse/KAFKA-9034
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
> Environment: macOS
>Reporter: Fenjin Wang
>Priority: Minor
>  Labels: easyfix, newbie
>
> If set JAVA_HOME to IntelliJ's java, bin/zookeeper-server-start.sh can't work 
> because the path has space in it.
>  
> {quote}export JAVA_HOME="/Applications/IntelliJ 
> IDEA.app/Contents/jbr/Contents/Home/"
> {quote}
>  
> We can fix this by quote "$JAVA" in the shell script according to: 
> [https://stackoverflow.com/a/7740746/1203241]
>  
> I can send a PR if you like.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9122) Externalizing DB password is not working

2019-10-31 Thread Dwijadas (Jira)
Dwijadas created KAFKA-9122:
---

 Summary: Externalizing DB password is not working
 Key: KAFKA-9122
 URL: https://issues.apache.org/jira/browse/KAFKA-9122
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.2.1
 Environment: CentOS 6.7
Reporter: Dwijadas


Hi
I am trying to externalizing user name and password for oracle DB using 
{{FileConfigProvider}} provider.

For that i have created a properties file that contains user name and password.

 

{{$ cat /home/kfk/data/ora_credentials.properties
ora.username="apps"
ora.password="Passw0rd!"}}

Added the config providers as file and also the config.providers.file.class as 
FileConfigProvider in the worker config:

 

{{$ cat /home/kfk/etc/kafka/connect-distributed.properties
...
...
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
...
...}}

Restarted the worker and submitted a task using REST with the following config

 

{{"config": \{
   "connector.class": 
"io.confluent.connect.jdbc.JdbcSourceConnector",
   "tasks.max": "1",
   "connection.user": 
"${file:/home/kfk/data/ora_credentials.properties:ora.username}",
   "connection.password": 
"${file:/home/kfk/data/ora_credentials.properties:ora.password}",
   ...
   ...
}}}

Submitting the above task resulting in the following error:

 

{{{
  "error_code": 400,
  "message": "Connector configuration is invalid and contains the following 2 
error(s):\nInvalid value java.sql.SQLException: ORA-01017: invalid 
username/password; logon denied\n for configuration Couldn't open connection to 
jdbc:oracle:thin:@oebsr122.infodetics.com:1521:VIS\nInvalid value 
java.sql.SQLException: ORA-01017: invalid username/password; logon denied\n for 
configuration Couldn't open connection to 
jdbc:oracle:thin:@oebsr122.infodetics.com:1521:VIS\nYou can also find the above 
list of errors at the endpoint `/\{connectorType}/config/validate`"
}}}

Assuming the above config does not replaces the user name and password at all 
rather entire values for connection.user and connection.password are used to 
connect to the DB resulting in ORA-01017: invalid username/password error.

Is it a bug ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2019-10-31 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16963785#comment-16963785
 ] 

Matthias J. Sax commented on KAFKA-8769:


I did not ready the KIP yet, but I personally don't think it's a good idea 
because flushing `suppress()` for "final window emit" based on wall-clock time 
breaks the underlying idea of the suppress-contract.

For regular "rate limitation" base on wall-clock time, there is already a KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time

Last, wall-clock time base rate control is not related to this Jira ticket at 
all, but it's two independent/orthogonal issue. Why do we need KIP-539, ie, 
what is different to KIP-424? In fact, skipping over the KIP it seems that you 
might have copied from KIP-424, anyway?

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6542) Tables should trigger joins too, not just streams

2019-10-31 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16963776#comment-16963776
 ] 

Matthias J. Sax commented on KAFKA-6542:


[~cpreisler], it's unfortunately not that easy. In fact, `StreamThread` will 
not process any stream-side records before it goes to state RUNNING. However, 
during processing, the table might get updated at any point in time and if 
there is out-of-order data, a stream-side record might join with incorrect 
table state.

However, the Jira title is miss-leading anyway, because a table update can 
never trigger a join computation anyway, because conceptually the stream side 
is empty. Hence, a join can only happen if a stream-record is processed.

Also note, if there are no out-of-order data, Kafka Streams ensure that the 
table is updated first if the table-record has a smaller timestamp than the 
stream-record (cf KAFKA-3514).

Fixing the out-of-order data issue is a larger and more complex change and 
thus, would even require a KIP.

> Tables should trigger joins too, not just streams
> -
>
> Key: KAFKA-6542
> URL: https://issues.apache.org/jira/browse/KAFKA-6542
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Antony Stubbs
>Priority: Major
>
> At the moment it's quite possible to have a race condition when joining a 
> stream with a table, if the stream event arrives first, before the table 
> event, in which case the join will fail.
> This is also related to bootstrapping KTables (which is what a GKTable does).
> Related to: KAFKA-4113 Allow KTable bootstrap



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9088) Consolidate InternalMockProcessorContext and MockInternalProcessorContext

2019-10-31 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16963757#comment-16963757
 ] 

Matthias J. Sax commented on KAFKA-9088:


[~ableegoldman] I have no objections – as long as it's not leaking into public 
API, I am fine with it.

Given the current "flood" of related tickets that reveals the internal mess 
with regard to those interfaces, it's unclear to me atm, how a "clean" fix 
would look like.

> Consolidate InternalMockProcessorContext and MockInternalProcessorContext
> -
>
> Key: KAFKA-9088
> URL: https://issues.apache.org/jira/browse/KAFKA-9088
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Bruno Cadonna
>Priority: Minor
>  Labels: newbie
>
> Currently, we have two mocks for the {{InternalProcessorContext}}. The goal 
> of this ticket is to merge both into one mock or replace it with an 
> {{EasyMock}} mock. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)