[GitHub] [kafka] showuon commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on
showuon commented on pull request #9149: URL: https://github.com/apache/kafka/pull/9149#issuecomment-671191319 @kkonstantine , could you help review this PR to improve logging? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9149: KAFKA-10340: improve the logging to help user know what is going on
showuon commented on a change in pull request #9149: URL: https://github.com/apache/kafka/pull/9149#discussion_r467713932 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -363,6 +363,7 @@ private boolean sendRecords() { try { maybeCreateTopic(record.topic()); final String topic = producerRecord.topic(); +log.trace("{} is going to send record to {}", WorkerSourceTask.this, topic); Review comment: If user opened the trace log, it'll see the producer is going to send record before producer.send got stuck This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever
[ https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174105#comment-17174105 ] Luke Chen edited comment on KAFKA-10340 at 8/10/20, 6:45 AM: - After investigation, if the {{auto.create.topics.enable}} is disabled, the producer.send will get only {{TimeoutException}} and no other valuable clue for users. So, I will improve the logging in this area. Proposed to add 2 logs: 1. When the topic creation is disabled or topic is existed: log.debug({color:#008000}"The topic creation setting is disabled or the topic name {} is already created. " {color}+ {color:#008000}"If the topic doesn't exist, we'll rely on the auto.create.topics.enable setting in broker side " {color}+ {color:#008000}"to see if the topic can be auto created or not"{color}, topic); 2. Before the producer send the record: log.trace({color:#008000}"{} is going to send record to {}"{color}, WorkerSourceTask.{color:#80}this{color}, topic); So, for the request to let user know it's stuck waiting for the destination topic to be created is now basically cannot know from client side because the producer.send will block on waitOnMetadata method, which will keep trying until timeout. There are many possible reasons for this timeout. It's hard to tell. And the dynamically {{describeConfigs}} to get the broker setting is also not easy because the broker name (we need broker name to describeConfig) is not kept in the config, and also there's no other places in Kafka to check the broker setting before doing something. I'd prefer to keep it as is because this behavior(auto create topic or not while producer.send) applied for all kafka, not only for connectors. Thanks. was (Author: showuon): After investigation, will improve the logging in this area. Proposed to add 2 logs: 1. When the topic creation is disabled or topic is existed: log.debug({color:#008000}"The topic creation setting is disabled or the topic name {} is already created. " {color}+ {color:#008000}"If the topic doesn't exist, we'll rely on the auto.create.topics.enable setting in broker side " {color}+ {color:#008000}"to see if the topic can be auto created or not"{color}, topic); 2. Before the producer send the record: log.trace({color:#008000}"{} is going to send record to {}"{color}, WorkerSourceTask.{color:#80}this{color}, topic); So, for the request to let user know it's stuck waiting for the destination topic to be created is now basically cannot know from client side because the producer.send will block on waitOnMetadata method, which will keep trying until timeout. There are many possible reasons for this timeout. It's hard to tell. And the dynamically {{describeConfigs}} to get the broker setting is also not easy because the broker name (we need broker name to describeConfig) is not kept in the config, and also there's no other places in Kafka to check the broker setting before doing something. I'd prefer to keep it as is because this behavior(auto create topic or not while producer.send) applied for all kafka, not only for connectors. Thanks. > Source connectors should report error when trying to produce records to > non-existent topics instead of hanging forever > -- > > Key: KAFKA-10340 > URL: https://issues.apache.org/jira/browse/KAFKA-10340 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Arjun Satish >Assignee: Luke Chen >Priority: Major > > Currently, a source connector will blindly attempt to write a record to a > Kafka topic. When the topic does not exist, its creation is controlled by the > {{auto.create.topics.enable}} config on the brokers. When auto.create is > disabled, the producer.send() call on the Connect worker will hang > indefinitely (due to the "infinite retries" configuration for said producer). > In setups where this config is usually disabled, the source connector simply > appears to hang and not produce any output. > It is desirable to either log an info or an error message (or inform the user > somehow) that the connector is simply stuck waiting for the destination topic > to be created. When the worker has permissions to inspect the broker > settings, it can use the {{listTopics}} and {{describeConfigs}} API in > AdminClient to check if the topic exists, the broker can > {{auto.create.topics.enable}} topics, and if these cases do not exist, either > throw an error. > With the recently merged > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics], > this becomes even more specifi
[GitHub] [kafka] showuon opened a new pull request #9149: KAFKA-10340: improve the logging to help user know what is going on
showuon opened a new pull request #9149: URL: https://github.com/apache/kafka/pull/9149 Improve the logging in `maybeCreateTopic` to let user know if `!topicCreation.isTopicCreationRequired(topic)`, we won't create this topic because the topic creation setting is disabled or the topic name is already created. Also, we should let user know that if the topic doesn't exist, we'll rely on the `auto.create.topics.enable` setting in broker side to see if the topic can be auto created or not. Otherwise, if the `auto.create.topics.enable` is disabled, the producer.send will get only `TimeoutException` and no other valuable clue for users. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever
[ https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174105#comment-17174105 ] Luke Chen edited comment on KAFKA-10340 at 8/10/20, 6:40 AM: - After investigation, will improve the logging in this area. Proposed to add 2 logs: 1. When the topic creation is disabled or topic is existed: log.debug({color:#008000}"The topic creation setting is disabled or the topic name {} is already created. " {color}+ {color:#008000}"If the topic doesn't exist, we'll rely on the auto.create.topics.enable setting in broker side " {color}+ {color:#008000}"to see if the topic can be auto created or not"{color}, topic); 2. Before the producer send the record: log.trace({color:#008000}"{} is going to send record to {}"{color}, WorkerSourceTask.{color:#80}this{color}, topic); So, for the request to let user know it's stuck waiting for the destination topic to be created is now basically cannot know from client side because the producer.send will block on waitOnMetadata method, which will keep trying until timeout. There are many possible reasons for this timeout. It's hard to tell. And the dynamically {{describeConfigs}} to get the broker setting is also not easy because the broker name (we need broker name to describeConfig) is not kept in the config, and also there's no other places in Kafka to check the broker setting before doing something. I'd prefer to keep it as is because this behavior(auto create topic or not while producer.send) applied for all kafka, not only for connectors. Thanks. was (Author: showuon): After investigation, will improve the logging in this area. Proposed to add 2 logs: 1. When the topic creation is disabled or topic is existed: log.debug({color:#008000}"The topic creation setting is disabled or the topic name {} is already created. " {color}+ {color:#008000}"If the topic doesn't exist, we'll rely on the auto.create.topics.enable setting in broker side " {color}+ {color:#008000}"to see if the topic can be auto created or not"{color}, topic); 2. Before the producer send the record: log.trace({color:#008000}"{} is going to send record to {}"{color}, WorkerSourceTask.{color:#80}this{color}, topic); So, for the request to let user know it's stuck waiting for the destination topic to be created is now basically cannot know from client side because the producer.send will block on waitOnMetadata method, which will keep trying until timeout. There are many possible reasons for this timeout. It's hard to tell. And the dynamically {{describeConfigs}} to get the broker setting is also not easy because the broker name (we need broker name to describeConfig) is not kept in the config, and also there's no other places in Kafka to check the broker setting before doing something. I'd prefer to keep it as is because this behavior applied for all kafka, not only for connectors. Thanks. > Source connectors should report error when trying to produce records to > non-existent topics instead of hanging forever > -- > > Key: KAFKA-10340 > URL: https://issues.apache.org/jira/browse/KAFKA-10340 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Arjun Satish >Assignee: Luke Chen >Priority: Major > > Currently, a source connector will blindly attempt to write a record to a > Kafka topic. When the topic does not exist, its creation is controlled by the > {{auto.create.topics.enable}} config on the brokers. When auto.create is > disabled, the producer.send() call on the Connect worker will hang > indefinitely (due to the "infinite retries" configuration for said producer). > In setups where this config is usually disabled, the source connector simply > appears to hang and not produce any output. > It is desirable to either log an info or an error message (or inform the user > somehow) that the connector is simply stuck waiting for the destination topic > to be created. When the worker has permissions to inspect the broker > settings, it can use the {{listTopics}} and {{describeConfigs}} API in > AdminClient to check if the topic exists, the broker can > {{auto.create.topics.enable}} topics, and if these cases do not exist, either > throw an error. > With the recently merged > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics], > this becomes even more specific a corner case: when topic creation settings > are enabled, the worker should handle the corner case where topic creation is > disabled, {{auto.create.topics.enable}} is set to false and topic
[jira] [Comment Edited] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever
[ https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174105#comment-17174105 ] Luke Chen edited comment on KAFKA-10340 at 8/10/20, 6:38 AM: - Will improve the logging in this area. Proposed to add 2 logs: 1. When the topic creation is disabled or topic is existed: log.debug({color:#008000}"The topic creation setting is disabled or the topic name {} is already created. " {color}+ {color:#008000}"If the topic doesn't exist, we'll rely on the auto.create.topics.enable setting in broker side " {color}+ {color:#008000}"to see if the topic can be auto created or not"{color}, topic); 2. Before the producer send the record: log.trace({color:#008000}"{} is going to send record to {}"{color}, WorkerSourceTask.{color:#80}this{color}, topic); So, for the request to let user know it's stuck waiting for the destination topic to be created is now basically cannot know from client side because the producer.send will block on waitOnMetadata method, which will keep trying until timeout. There are many possible reasons for this timeout. It's hard to tell. And the dynamically {{describeConfigs}} to get the broker setting is also not easy because the broker name (we need broker name to describeConfig) is not kept in the config, and also there's no other places in Kafka to check the broker setting before doing something. I'd prefer to keep it as is because this behavior applied for all kafka, not only for connectors. Thanks. was (Author: showuon): Will improve the logging in this area. Proposed to add 3 logs: 1. When the topic creation is disabled or topic is existed: log.debug({color:#008000}"The topic creation setting is disabled or the topic name {} is already created. " {color}+ {color:#008000}"If the topic doesn't exist, we'll rely on the auto.create.topics.enable setting in broker side " {color}+ {color:#008000}"to see if the topic can be auto created or not"{color}, topic); 2. Before the producer send the record: log.trace({color:#008000}"{} is going to send record to {}"{color}, WorkerSourceTask.{color:#80}this{color}, topic); 3. If producer send record failed and topic is not existed and topic creation is disabled: log.info({color:#008000}"The reason of the error might be the server disabled the " {color}+ {color:#008000}"auto.create.topics.enable setting, please check the broker setting."{color}); > Source connectors should report error when trying to produce records to > non-existent topics instead of hanging forever > -- > > Key: KAFKA-10340 > URL: https://issues.apache.org/jira/browse/KAFKA-10340 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Arjun Satish >Assignee: Luke Chen >Priority: Major > > Currently, a source connector will blindly attempt to write a record to a > Kafka topic. When the topic does not exist, its creation is controlled by the > {{auto.create.topics.enable}} config on the brokers. When auto.create is > disabled, the producer.send() call on the Connect worker will hang > indefinitely (due to the "infinite retries" configuration for said producer). > In setups where this config is usually disabled, the source connector simply > appears to hang and not produce any output. > It is desirable to either log an info or an error message (or inform the user > somehow) that the connector is simply stuck waiting for the destination topic > to be created. When the worker has permissions to inspect the broker > settings, it can use the {{listTopics}} and {{describeConfigs}} API in > AdminClient to check if the topic exists, the broker can > {{auto.create.topics.enable}} topics, and if these cases do not exist, either > throw an error. > With the recently merged > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics], > this becomes even more specific a corner case: when topic creation settings > are enabled, the worker should handle the corner case where topic creation is > disabled, {{auto.create.topics.enable}} is set to false and topic does not > exist. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever
[ https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174105#comment-17174105 ] Luke Chen edited comment on KAFKA-10340 at 8/10/20, 6:38 AM: - After investigation, will improve the logging in this area. Proposed to add 2 logs: 1. When the topic creation is disabled or topic is existed: log.debug({color:#008000}"The topic creation setting is disabled or the topic name {} is already created. " {color}+ {color:#008000}"If the topic doesn't exist, we'll rely on the auto.create.topics.enable setting in broker side " {color}+ {color:#008000}"to see if the topic can be auto created or not"{color}, topic); 2. Before the producer send the record: log.trace({color:#008000}"{} is going to send record to {}"{color}, WorkerSourceTask.{color:#80}this{color}, topic); So, for the request to let user know it's stuck waiting for the destination topic to be created is now basically cannot know from client side because the producer.send will block on waitOnMetadata method, which will keep trying until timeout. There are many possible reasons for this timeout. It's hard to tell. And the dynamically {{describeConfigs}} to get the broker setting is also not easy because the broker name (we need broker name to describeConfig) is not kept in the config, and also there's no other places in Kafka to check the broker setting before doing something. I'd prefer to keep it as is because this behavior applied for all kafka, not only for connectors. Thanks. was (Author: showuon): Will improve the logging in this area. Proposed to add 2 logs: 1. When the topic creation is disabled or topic is existed: log.debug({color:#008000}"The topic creation setting is disabled or the topic name {} is already created. " {color}+ {color:#008000}"If the topic doesn't exist, we'll rely on the auto.create.topics.enable setting in broker side " {color}+ {color:#008000}"to see if the topic can be auto created or not"{color}, topic); 2. Before the producer send the record: log.trace({color:#008000}"{} is going to send record to {}"{color}, WorkerSourceTask.{color:#80}this{color}, topic); So, for the request to let user know it's stuck waiting for the destination topic to be created is now basically cannot know from client side because the producer.send will block on waitOnMetadata method, which will keep trying until timeout. There are many possible reasons for this timeout. It's hard to tell. And the dynamically {{describeConfigs}} to get the broker setting is also not easy because the broker name (we need broker name to describeConfig) is not kept in the config, and also there's no other places in Kafka to check the broker setting before doing something. I'd prefer to keep it as is because this behavior applied for all kafka, not only for connectors. Thanks. > Source connectors should report error when trying to produce records to > non-existent topics instead of hanging forever > -- > > Key: KAFKA-10340 > URL: https://issues.apache.org/jira/browse/KAFKA-10340 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Arjun Satish >Assignee: Luke Chen >Priority: Major > > Currently, a source connector will blindly attempt to write a record to a > Kafka topic. When the topic does not exist, its creation is controlled by the > {{auto.create.topics.enable}} config on the brokers. When auto.create is > disabled, the producer.send() call on the Connect worker will hang > indefinitely (due to the "infinite retries" configuration for said producer). > In setups where this config is usually disabled, the source connector simply > appears to hang and not produce any output. > It is desirable to either log an info or an error message (or inform the user > somehow) that the connector is simply stuck waiting for the destination topic > to be created. When the worker has permissions to inspect the broker > settings, it can use the {{listTopics}} and {{describeConfigs}} API in > AdminClient to check if the topic exists, the broker can > {{auto.create.topics.enable}} topics, and if these cases do not exist, either > throw an error. > With the recently merged > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics], > this becomes even more specific a corner case: when topic creation settings > are enabled, the worker should handle the corner case where topic creation is > disabled, {{auto.create.topics.enable}} is set to false and topic does not > exist. -- This message was sent by Atlassian Jira (v8
[jira] [Commented] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever
[ https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174105#comment-17174105 ] Luke Chen commented on KAFKA-10340: --- Will improve the logging in this area. Proposed to add 3 logs: 1. When the topic creation is disabled or topic is existed: log.debug({color:#008000}"The topic creation setting is disabled or the topic name {} is already created. " {color}+ {color:#008000}"If the topic doesn't exist, we'll rely on the auto.create.topics.enable setting in broker side " {color}+ {color:#008000}"to see if the topic can be auto created or not"{color}, topic); 2. Before the producer send the record: log.trace({color:#008000}"{} is going to send record to {}"{color}, WorkerSourceTask.{color:#80}this{color}, topic); 3. If producer send record failed and topic is not existed and topic creation is disabled: log.info({color:#008000}"The reason of the error might be the server disabled the " {color}+ {color:#008000}"auto.create.topics.enable setting, please check the broker setting."{color}); > Source connectors should report error when trying to produce records to > non-existent topics instead of hanging forever > -- > > Key: KAFKA-10340 > URL: https://issues.apache.org/jira/browse/KAFKA-10340 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Arjun Satish >Assignee: Luke Chen >Priority: Major > > Currently, a source connector will blindly attempt to write a record to a > Kafka topic. When the topic does not exist, its creation is controlled by the > {{auto.create.topics.enable}} config on the brokers. When auto.create is > disabled, the producer.send() call on the Connect worker will hang > indefinitely (due to the "infinite retries" configuration for said producer). > In setups where this config is usually disabled, the source connector simply > appears to hang and not produce any output. > It is desirable to either log an info or an error message (or inform the user > somehow) that the connector is simply stuck waiting for the destination topic > to be created. When the worker has permissions to inspect the broker > settings, it can use the {{listTopics}} and {{describeConfigs}} API in > AdminClient to check if the topic exists, the broker can > {{auto.create.topics.enable}} topics, and if these cases do not exist, either > throw an error. > With the recently merged > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics], > this becomes even more specific a corner case: when topic creation settings > are enabled, the worker should handle the corner case where topic creation is > disabled, {{auto.create.topics.enable}} is set to false and topic does not > exist. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()
vvcephei commented on a change in pull request #9148: URL: https://github.com/apache/kafka/pull/9148#discussion_r467671774 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ## @@ -499,7 +500,7 @@ public synchronized StreamsBuilder addStateStore(final StoreBuilder builder) topic, new ConsumedInternal<>(consumed), processorName, -stateUpdateSupplier +() -> ProcessorAdapter.adapt(stateUpdateSupplier.get()) Review comment: Adapt the old API to the new one. ## File path: streams/src/main/java/org/apache/kafka/streams/Topology.java ## @@ -721,7 +722,7 @@ public synchronized Topology addStateStore(final StoreBuilder storeBuilder, valueDeserializer, topic, processorName, -stateUpdateSupplier +() -> ProcessorAdapter.adapt(stateUpdateSupplier.get()) Review comment: Also here, adapting the old API to the new one. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java ## @@ -201,12 +200,12 @@ public synchronized void addStateStore(final StoreBuilder builder) { addGraphNode(root, new StateStoreNode<>(builder)); } -public synchronized void addGlobalStore(final StoreBuilder storeBuilder, - final String sourceName, - final String topic, - final ConsumedInternal consumed, - final String processorName, - final ProcessorSupplier stateUpdateSupplier) { +public synchronized void addGlobalStore(final StoreBuilder storeBuilder, + final String sourceName, + final String topic, + final ConsumedInternal consumed, + final String processorName, + final org.apache.kafka.streams.processor.api.ProcessorSupplier stateUpdateSupplier) { Review comment: For the internal builder, just directly change to the new API. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java ## @@ -89,14 +90,16 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { new TimestampedKeyValueStoreMaterializer<>((MaterializedInternal>) materializedInternal).materialize(); if (isGlobalKTable) { -topologyBuilder.addGlobalStore(storeBuilder, - sourceName, - consumedInternal().timestampExtractor(), - consumedInternal().keyDeserializer(), - consumedInternal().valueDeserializer(), - topicName, - processorParameters.processorName(), - processorParameters.processorSupplier()); +topologyBuilder.addGlobalStore( +storeBuilder, +sourceName, +consumedInternal().timestampExtractor(), +consumedInternal().keyDeserializer(), +consumedInternal().valueDeserializer(), +topicName, +processorParameters.processorName(), +() -> ProcessorAdapter.adapt(processorParameters.processorSupplier().get()) Review comment: For now, just adapt. Later this whole node will get converted to the new API. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -192,16 +192,23 @@ private boolean isWindowStore() { } private static class ProcessorNodeFactory extends NodeFactory { -private final ProcessorSupplier supplier; +private final org.apache.kafka.streams.processor.api.ProcessorSupplier supplier; private final Set stateStoreNames = new HashSet<>(); ProcessorNodeFactory(final String name, final String[] predecessors, - final ProcessorSupplier supplier) { + final org.apache.kafka.streams.processor.api.ProcessorSupplier supplier) { Review comment: We need to create a factory for the global node, so I'm adding the constructor for the new API, and also converting the internals here. ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTe
[jira] [Created] (KAFKA-10380) Make dist flatten rocksdbjni
Adrian Cole created KAFKA-10380: --- Summary: Make dist flatten rocksdbjni Key: KAFKA-10380 URL: https://issues.apache.org/jira/browse/KAFKA-10380 Project: Kafka Issue Type: Task Components: build Affects Versions: 2.6.0 Reporter: Adrian Cole I was looking for ways to reduce the size of our Kafka image, and the most notable opportunity is handling rocksdbjni differently. It is currently a 15MB jar. As mentioned in its description rocksdbjni includes binaries for a lot of OS choices. du -k librocksdbjni-* 7220librocksdbjni-linux-aarch64.so 8756librocksdbjni-linux-ppc64le.so 7220librocksdbjni-linux32.so 7932librocksdbjni-linux64.so 5440librocksdbjni-osx.jnilib 4616librocksdbjni-win64.dll It may not seem obvious in normal dists, which aim to work for many operating systems what is a problem here. When creating docker images, we currently would need to repackage this to scrub out the irrelevant OS items or accept files larger than alpine itself. While this might be something to kick back to rocksdb. having some options here would be great. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei opened a new pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()
vvcephei opened a new pull request #9148: URL: https://github.com/apache/kafka/pull/9148 From KIP-478, implement the new StreamBuilder#addGlobalStore() overload that takes a stateUpdateSupplier fully typed Processor. Where necessary, use the adapters to make the old APIs defer to the new ones, as well as limiting the scope of this change set. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10379) Implement the KIP-478 StreamBuilder#addGlobalStore()
John Roesler created KAFKA-10379: Summary: Implement the KIP-478 StreamBuilder#addGlobalStore() Key: KAFKA-10379 URL: https://issues.apache.org/jira/browse/KAFKA-10379 Project: Kafka Issue Type: Sub-task Components: streams Reporter: John Roesler Assignee: John Roesler -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10261) Introduce the KIP-478 processors with shims
[ https://issues.apache.org/jira/browse/KAFKA-10261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10261: - Fix Version/s: 2.7.0 > Introduce the KIP-478 processors with shims > --- > > Key: KAFKA-10261 > URL: https://issues.apache.org/jira/browse/KAFKA-10261 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.7.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10261) Introduce the KIP-478 processors with shims
[ https://issues.apache.org/jira/browse/KAFKA-10261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-10261. -- Resolution: Fixed > Introduce the KIP-478 processors with shims > --- > > Key: KAFKA-10261 > URL: https://issues.apache.org/jira/browse/KAFKA-10261 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei merged pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with adapters
vvcephei merged pull request #9004: URL: https://github.com/apache/kafka/pull/9004 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with adapters
vvcephei commented on pull request #9004: URL: https://github.com/apache/kafka/pull/9004#issuecomment-671134513 Merged to trunk. Thanks for the review @abbccdda ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with adapters
vvcephei commented on pull request #9004: URL: https://github.com/apache/kafka/pull/9004#issuecomment-671134339 Just one unrelated test failure: ``` org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] failed, log available in /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/streams/build/reports/testOutput/org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true].test.stdout org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest > shouldUpgradeFromEosAlphaToEosBeta[true] FAILED java.lang.AssertionError: Expected: <[KeyValue(2, 0), KeyValue(2, 1), KeyValue(2, 3), KeyValue(2, 6), KeyValue(2, 10), KeyValue(2, 15), KeyValue(2, 21), KeyValue(2, 28), KeyValue(2, 36), KeyValue(2, 45), KeyValue(2, 55), KeyValue(2, 66), KeyValue(2, 78), KeyValue(2, 91), KeyValue(2, 105), KeyValue(2, 120), KeyValue(2, 136), KeyValue(2, 153), KeyValue(2, 171), KeyValue(2, 190), KeyValue(2, 120), KeyValue(2, 136), KeyValue(2, 153), KeyValue(2, 171), KeyValue(2, 190), KeyValue(2, 210), KeyValue(2, 231), KeyValue(2, 253), KeyValue(2, 276), KeyValue(2, 300), KeyValue(2, 325), KeyValue(2, 351), KeyValue(2, 378), KeyValue(2, 406), KeyValue(2, 435), KeyValue(2, 210), KeyValue(2, 231), KeyValue(2, 253), KeyValue(2, 276), KeyValue(2, 300), KeyValue(2, 325), KeyValue(2, 351), KeyValue(2, 378), KeyValue(2, 406), KeyValue(2, 435), KeyValue(2, 465), KeyValue(2, 496), KeyValue(2, 528), KeyValue(2, 561), KeyValue(2, 595)]> but: was <[KeyValue(2, 0), KeyValue(2, 1), KeyValue(2, 3), KeyValue(2, 6), KeyValue(2, 10), KeyValue(2, 15), KeyValue(2, 21), KeyValue(2, 28), KeyValue(2, 36), KeyValue(2, 45), KeyValue(2, 55), KeyValue(2, 66), KeyValue(2, 78), KeyValue(2, 91), KeyValue(2, 105), KeyValue(2, 120), KeyValue(2, 136), KeyValue(2, 153), KeyValue(2, 171), KeyValue(2, 190), KeyValue(2, 120), KeyValue(2, 136), KeyValue(2, 153), KeyValue(2, 171), KeyValue(2, 190), KeyValue(2, 210), KeyValue(2, 231), KeyValue(2, 253), KeyValue(2, 276), KeyValue(2, 300), KeyValue(2, 325), KeyValue(2, 351), KeyValue(2, 378), KeyValue(2, 406), KeyValue(2, 435), KeyValue(2, 210), KeyValue(2, 231), KeyValue(2, 253), KeyValue(2, 276), KeyValue(2, 300), KeyValue(2, 325), KeyValue(2, 351), KeyValue(2, 378), KeyValue(2, 406), KeyValue(2, 435), KeyValue(2, 465), KeyValue(2, 496), KeyValue(2, 528), KeyValue(2, 561), KeyValue(2, 595), KeyValue(2, 465), KeyValue(2, 496)]> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.checkResultPerKey(EosBetaUpgradeIntegrationTest.java:1042) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyUncommitted(EosBetaUpgradeIntegrationTest.java:1005) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:738) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9935) Kafka not releasing member from Consumer Group
[ https://issues.apache.org/jira/browse/KAFKA-9935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-9935. Resolution: Duplicate Closing as duplicate of KAFKA-9752. Please reopen if reproduced with 2.5.0 or newer. > Kafka not releasing member from Consumer Group > -- > > Key: KAFKA-9935 > URL: https://issues.apache.org/jira/browse/KAFKA-9935 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.1 > Environment: Linux >Reporter: Steve Kecskes >Priority: Major > > Hello. I am experiencing an issue where Kafka is not releasing members from a > consumer group when the member crashes. The consumer group is then stuck in > rebalancing state indefinitely. > In this consumer group, there is only 1 client. This client has the following > related settings: > {code:java} > auto.commit.interval.ms = 5000 > auto.offset.reset = earliest > bootstrap.servers = [austgkafka01.hk.eclipseoptions.com:9092] > check.crcs = true > client.dns.lookup = default > client.id = TraderAutomationViewServer_workaround_stuck_rebalance_20200427-0 > connections.max.idle.ms = 54 > default.api.timeout.ms = 6 > enable.auto.commit = true > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = TraderAutomationViewServer_workaround_stuck_rebalance_20200427 > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = true > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 1 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 16777216 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 3 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 1 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > {code} > If the client crashes (not a graceful exit from group) the group remains in > the following state indefinitely. > {code} > Warning: Consumer group > 'TraderAutomationViewServer_workaround_stuck_rebalance' is rebalancing. > GROUP TOPIC > PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID > HOSTCLIENT-ID > TraderAutomationViewServer_workaround_stuck_rebalance EventAdjustedVols 10 > 6984061 7839599 855538 - - >- > TraderAutomationViewServer_workaround_stuck_rebalance VolMetrics8 > 128459531 143736443 15276912- - >- > TraderAutomationViewServer_workaround_stuck_rebalance EventAdjustedVols 12 > 7216495 8106030 889535 - - >- > TraderAutomationViewServer_workaround_stuck_rebalance VolMetrics6 > 122921729 137377358 14455629- - >- > TraderAutomationViewServer_workaround_stuck_rebalance EventAdjustedVols 14 > 5457618 6171142 713524 - - >- > TraderAutomationViewServer_workaround_stuck_rebalance VolMetrics4 > 125647891 140542566 14894675- - >
[jira] [Resolved] (KAFKA-10105) Regression in group coordinator dealing with flaky clients joining while leaving
[ https://issues.apache.org/jira/browse/KAFKA-10105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-10105. - Resolution: Duplicate Closing as duplicate of KAFKA-9752. Please reopen if reproduced with 2.5.0. > Regression in group coordinator dealing with flaky clients joining while > leaving > > > Key: KAFKA-10105 > URL: https://issues.apache.org/jira/browse/KAFKA-10105 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.4.1 > Environment: Kafka 2.4.1 on jre 11 on debian 9 in docker >Reporter: William Reynolds >Priority: Major > > Since upgrade of a cluster from 1.1.0 to 2.4.1 the broker no longer deals > correctly with a consumer sending a join after a leave correctly. > What happens no is that if a consumer sends a leaving then follows up by > trying to send a join again as it is shutting down the group coordinator adds > the leaving member to the group but never seems to heartbeat that member. > Since the consumer is then gone when it joins again after starting it is > added as a new member but the zombie member is there and is included in the > partition assignment which means that those partitions never get consumed > from. What can also happen is that one of the zombies gets group leader so > rebalance gets stuck forever and the group is entirely blocked. > I have not been able to track down where this got introduced between 1.1.0 > and 2.4.1 but I will look further into this. Unfortunately the logs are > essentially silent about the zombie mebers and I only had INFO level logging > on during the issue and by stopping all the consumers in the group and > restarting the broker coordinating that group we could get back to a working > state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10378) issue when create producer using java
[ https://issues.apache.org/jira/browse/KAFKA-10378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173953#comment-17173953 ] Mohammad Abdelqader commented on KAFKA-10378: - Thank you Ismael > issue when create producer using java > - > > Key: KAFKA-10378 > URL: https://issues.apache.org/jira/browse/KAFKA-10378 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0 > Environment: mac os > java version "1.8.0_231" > intellij >Reporter: Mohammad Abdelqader >Priority: Blocker > Fix For: 2.7.0, 2.6.1 > > > I created simple producer using java by Intellij studio. When i run project , > it return following issue > [kafka-producer-network-thread | producer-1] ERROR > org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread > 'kafka-producer-network-thread | producer-1':[kafka-producer-network-thread | > producer-1] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught > exception in thread 'kafka-producer-network-thread | > producer-1':java.lang.NoClassDefFoundError: > com/fasterxml/jackson/databind/JsonNode at > org.apache.kafka.common.requests.ApiVersionsRequest$Builder.(ApiVersionsRequest.java:36) > at > org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:910) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:555) at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.JsonNode at > java.net.URLClassLoader.findClass(URLClassLoader.java:382) at > java.lang.ClassLoader.loadClass(ClassLoader.java:418) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at > java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 6 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10378) issue when create producer using java
[ https://issues.apache.org/jira/browse/KAFKA-10378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173952#comment-17173952 ] Ismael Juma commented on KAFKA-10378: - I reopened the issue since this needs to be fixed. Adding the dependency is a workaround. > issue when create producer using java > - > > Key: KAFKA-10378 > URL: https://issues.apache.org/jira/browse/KAFKA-10378 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0 > Environment: mac os > java version "1.8.0_231" > intellij >Reporter: Mohammad Abdelqader >Priority: Blocker > Fix For: 2.7.0, 2.6.1 > > > I created simple producer using java by Intellij studio. When i run project , > it return following issue > [kafka-producer-network-thread | producer-1] ERROR > org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread > 'kafka-producer-network-thread | producer-1':[kafka-producer-network-thread | > producer-1] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught > exception in thread 'kafka-producer-network-thread | > producer-1':java.lang.NoClassDefFoundError: > com/fasterxml/jackson/databind/JsonNode at > org.apache.kafka.common.requests.ApiVersionsRequest$Builder.(ApiVersionsRequest.java:36) > at > org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:910) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:555) at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.JsonNode at > java.net.URLClassLoader.findClass(URLClassLoader.java:382) at > java.lang.ClassLoader.loadClass(ClassLoader.java:418) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at > java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 6 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10378) issue when create producer using java
[ https://issues.apache.org/jira/browse/KAFKA-10378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-10378: Fix Version/s: 2.7.0 > issue when create producer using java > - > > Key: KAFKA-10378 > URL: https://issues.apache.org/jira/browse/KAFKA-10378 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0 > Environment: mac os > java version "1.8.0_231" > intellij >Reporter: Mohammad Abdelqader >Priority: Critical > Fix For: 2.7.0, 2.6.1 > > > I created simple producer using java by Intellij studio. When i run project , > it return following issue > [kafka-producer-network-thread | producer-1] ERROR > org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread > 'kafka-producer-network-thread | producer-1':[kafka-producer-network-thread | > producer-1] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught > exception in thread 'kafka-producer-network-thread | > producer-1':java.lang.NoClassDefFoundError: > com/fasterxml/jackson/databind/JsonNode at > org.apache.kafka.common.requests.ApiVersionsRequest$Builder.(ApiVersionsRequest.java:36) > at > org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:910) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:555) at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.JsonNode at > java.net.URLClassLoader.findClass(URLClassLoader.java:382) at > java.lang.ClassLoader.loadClass(ClassLoader.java:418) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at > java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 6 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10378) issue when create producer using java
[ https://issues.apache.org/jira/browse/KAFKA-10378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-10378: Priority: Blocker (was: Critical) > issue when create producer using java > - > > Key: KAFKA-10378 > URL: https://issues.apache.org/jira/browse/KAFKA-10378 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0 > Environment: mac os > java version "1.8.0_231" > intellij >Reporter: Mohammad Abdelqader >Priority: Blocker > Fix For: 2.7.0, 2.6.1 > > > I created simple producer using java by Intellij studio. When i run project , > it return following issue > [kafka-producer-network-thread | producer-1] ERROR > org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread > 'kafka-producer-network-thread | producer-1':[kafka-producer-network-thread | > producer-1] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught > exception in thread 'kafka-producer-network-thread | > producer-1':java.lang.NoClassDefFoundError: > com/fasterxml/jackson/databind/JsonNode at > org.apache.kafka.common.requests.ApiVersionsRequest$Builder.(ApiVersionsRequest.java:36) > at > org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:910) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:555) at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.JsonNode at > java.net.URLClassLoader.findClass(URLClassLoader.java:382) at > java.lang.ClassLoader.loadClass(ClassLoader.java:418) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at > java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 6 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-10378) issue when create producer using java
[ https://issues.apache.org/jira/browse/KAFKA-10378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reopened KAFKA-10378: - > issue when create producer using java > - > > Key: KAFKA-10378 > URL: https://issues.apache.org/jira/browse/KAFKA-10378 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0 > Environment: mac os > java version "1.8.0_231" > intellij >Reporter: Mohammad Abdelqader >Priority: Critical > Fix For: 2.6.0 > > > I created simple producer using java by Intellij studio. When i run project , > it return following issue > [kafka-producer-network-thread | producer-1] ERROR > org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread > 'kafka-producer-network-thread | producer-1':[kafka-producer-network-thread | > producer-1] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught > exception in thread 'kafka-producer-network-thread | > producer-1':java.lang.NoClassDefFoundError: > com/fasterxml/jackson/databind/JsonNode at > org.apache.kafka.common.requests.ApiVersionsRequest$Builder.(ApiVersionsRequest.java:36) > at > org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:910) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:555) at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.JsonNode at > java.net.URLClassLoader.findClass(URLClassLoader.java:382) at > java.lang.ClassLoader.loadClass(ClassLoader.java:418) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at > java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 6 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10378) issue when create producer using java
[ https://issues.apache.org/jira/browse/KAFKA-10378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-10378: Fix Version/s: (was: 2.6.0) 2.6.1 > issue when create producer using java > - > > Key: KAFKA-10378 > URL: https://issues.apache.org/jira/browse/KAFKA-10378 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0 > Environment: mac os > java version "1.8.0_231" > intellij >Reporter: Mohammad Abdelqader >Priority: Critical > Fix For: 2.6.1 > > > I created simple producer using java by Intellij studio. When i run project , > it return following issue > [kafka-producer-network-thread | producer-1] ERROR > org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread > 'kafka-producer-network-thread | producer-1':[kafka-producer-network-thread | > producer-1] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught > exception in thread 'kafka-producer-network-thread | > producer-1':java.lang.NoClassDefFoundError: > com/fasterxml/jackson/databind/JsonNode at > org.apache.kafka.common.requests.ApiVersionsRequest$Builder.(ApiVersionsRequest.java:36) > at > org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:910) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:555) at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.JsonNode at > java.net.URLClassLoader.findClass(URLClassLoader.java:382) at > java.lang.ClassLoader.loadClass(ClassLoader.java:418) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at > java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 6 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10378) issue when create producer using java
[ https://issues.apache.org/jira/browse/KAFKA-10378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173951#comment-17173951 ] Ismael Juma commented on KAFKA-10378: - Thanks for the filing the issue. This looks like a regression in 2.6.0. As a workaround, you can add a dependency to the Jackson library in your project. > issue when create producer using java > - > > Key: KAFKA-10378 > URL: https://issues.apache.org/jira/browse/KAFKA-10378 > Project: Kafka > Issue Type: Test > Components: producer >Affects Versions: 2.6.0 > Environment: mac os > java version "1.8.0_231" > intellij >Reporter: Mohammad Abdelqader >Priority: Major > Fix For: 2.6.0 > > > I created simple producer using java by Intellij studio. When i run project , > it return following issue > [kafka-producer-network-thread | producer-1] ERROR > org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread > 'kafka-producer-network-thread | producer-1':[kafka-producer-network-thread | > producer-1] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught > exception in thread 'kafka-producer-network-thread | > producer-1':java.lang.NoClassDefFoundError: > com/fasterxml/jackson/databind/JsonNode at > org.apache.kafka.common.requests.ApiVersionsRequest$Builder.(ApiVersionsRequest.java:36) > at > org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:910) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:555) at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.JsonNode at > java.net.URLClassLoader.findClass(URLClassLoader.java:382) at > java.lang.ClassLoader.loadClass(ClassLoader.java:418) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at > java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 6 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10378) issue when create producer using java
[ https://issues.apache.org/jira/browse/KAFKA-10378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-10378: Issue Type: Bug (was: Test) > issue when create producer using java > - > > Key: KAFKA-10378 > URL: https://issues.apache.org/jira/browse/KAFKA-10378 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0 > Environment: mac os > java version "1.8.0_231" > intellij >Reporter: Mohammad Abdelqader >Priority: Critical > Fix For: 2.6.0 > > > I created simple producer using java by Intellij studio. When i run project , > it return following issue > [kafka-producer-network-thread | producer-1] ERROR > org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread > 'kafka-producer-network-thread | producer-1':[kafka-producer-network-thread | > producer-1] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught > exception in thread 'kafka-producer-network-thread | > producer-1':java.lang.NoClassDefFoundError: > com/fasterxml/jackson/databind/JsonNode at > org.apache.kafka.common.requests.ApiVersionsRequest$Builder.(ApiVersionsRequest.java:36) > at > org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:910) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:555) at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.JsonNode at > java.net.URLClassLoader.findClass(URLClassLoader.java:382) at > java.lang.ClassLoader.loadClass(ClassLoader.java:418) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at > java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 6 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10378) issue when create producer using java
[ https://issues.apache.org/jira/browse/KAFKA-10378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-10378: Priority: Critical (was: Major) > issue when create producer using java > - > > Key: KAFKA-10378 > URL: https://issues.apache.org/jira/browse/KAFKA-10378 > Project: Kafka > Issue Type: Test > Components: producer >Affects Versions: 2.6.0 > Environment: mac os > java version "1.8.0_231" > intellij >Reporter: Mohammad Abdelqader >Priority: Critical > Fix For: 2.6.0 > > > I created simple producer using java by Intellij studio. When i run project , > it return following issue > [kafka-producer-network-thread | producer-1] ERROR > org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread > 'kafka-producer-network-thread | producer-1':[kafka-producer-network-thread | > producer-1] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught > exception in thread 'kafka-producer-network-thread | > producer-1':java.lang.NoClassDefFoundError: > com/fasterxml/jackson/databind/JsonNode at > org.apache.kafka.common.requests.ApiVersionsRequest$Builder.(ApiVersionsRequest.java:36) > at > org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:910) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:555) at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.JsonNode at > java.net.URLClassLoader.findClass(URLClassLoader.java:382) at > java.lang.ClassLoader.loadClass(ClassLoader.java:418) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at > java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 6 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10378) issue when create producer using java
[ https://issues.apache.org/jira/browse/KAFKA-10378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173948#comment-17173948 ] Mohammad Abdelqader edited comment on KAFKA-10378 at 8/9/20, 6:43 PM: -- Solved by add [jackson-databind|https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind] in maven dependencies was (Author: moha08): Solve by add [jackson-databind|https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind] in maven dependencies > issue when create producer using java > - > > Key: KAFKA-10378 > URL: https://issues.apache.org/jira/browse/KAFKA-10378 > Project: Kafka > Issue Type: Test > Components: producer >Affects Versions: 2.6.0 > Environment: mac os > java version "1.8.0_231" > intellij >Reporter: Mohammad Abdelqader >Priority: Major > Fix For: 2.6.0 > > > I created simple producer using java by Intellij studio. When i run project , > it return following issue > [kafka-producer-network-thread | producer-1] ERROR > org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread > 'kafka-producer-network-thread | producer-1':[kafka-producer-network-thread | > producer-1] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught > exception in thread 'kafka-producer-network-thread | > producer-1':java.lang.NoClassDefFoundError: > com/fasterxml/jackson/databind/JsonNode at > org.apache.kafka.common.requests.ApiVersionsRequest$Builder.(ApiVersionsRequest.java:36) > at > org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:910) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:555) at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.JsonNode at > java.net.URLClassLoader.findClass(URLClassLoader.java:382) at > java.lang.ClassLoader.loadClass(ClassLoader.java:418) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at > java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 6 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10378) issue when create producer using java
[ https://issues.apache.org/jira/browse/KAFKA-10378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mohammad Abdelqader resolved KAFKA-10378. - Fix Version/s: 2.6.0 Resolution: Fixed Solve by add [jackson-databind|https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind] in maven dependencies > issue when create producer using java > - > > Key: KAFKA-10378 > URL: https://issues.apache.org/jira/browse/KAFKA-10378 > Project: Kafka > Issue Type: Test > Components: producer >Affects Versions: 2.6.0 > Environment: mac os > java version "1.8.0_231" > intellij >Reporter: Mohammad Abdelqader >Priority: Major > Fix For: 2.6.0 > > > I created simple producer using java by Intellij studio. When i run project , > it return following issue > [kafka-producer-network-thread | producer-1] ERROR > org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread > 'kafka-producer-network-thread | producer-1':[kafka-producer-network-thread | > producer-1] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught > exception in thread 'kafka-producer-network-thread | > producer-1':java.lang.NoClassDefFoundError: > com/fasterxml/jackson/databind/JsonNode at > org.apache.kafka.common.requests.ApiVersionsRequest$Builder.(ApiVersionsRequest.java:36) > at > org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:910) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:555) at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.JsonNode at > java.net.URLClassLoader.findClass(URLClassLoader.java:382) at > java.lang.ClassLoader.loadClass(ClassLoader.java:418) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at > java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 6 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10378) issue when create producer using java
Mohammad Abdelqader created KAFKA-10378: --- Summary: issue when create producer using java Key: KAFKA-10378 URL: https://issues.apache.org/jira/browse/KAFKA-10378 Project: Kafka Issue Type: Test Components: producer Affects Versions: 2.6.0 Environment: mac os java version "1.8.0_231" intellij Reporter: Mohammad Abdelqader I created simple producer using java by Intellij studio. When i run project , it return following issue [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread 'kafka-producer-network-thread | producer-1':[kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread 'kafka-producer-network-thread | producer-1':java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/JsonNode at org.apache.kafka.common.requests.ApiVersionsRequest$Builder.(ApiVersionsRequest.java:36) at org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:910) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:555) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.JsonNode at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 6 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10376) Add end timestamp to Time Window key. Update TimeWindow Serializer/Deserializer to set end of window.
[ https://issues.apache.org/jira/browse/KAFKA-10376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BPR updated KAFKA-10376: Description: Aggregate stream consumers are not currently able to get the end timestamp for Time Windows from the Deserializer. This is due to the fact that the end timestamp is not present in the message key. The end timestamp is not currently added to the end of the key for Time Window aggregates. Allowing the Consumer to set the window size in the Deserializer's constructor is not sufficient as an objective consumer would not know what window size was being used. It also stands to reason that the aggregator could choose to change the window without notifying the consumer: e.g., 1 minute windows during business hours and 1 hour off business hours. It would also be possible for the same consumer to be consuming several topics at once, each with a different window on each topic. Update the code to add the end timestamp long onto the end of the key and update the Deserializer to extract and set the correct end on the Window. This issue is peripherally related to KAFKA-4468, KAFKA-9649 and KAFKA-10366. was: Aggregate stream consumers are not currently able to get the end timestamp for Time Windows from the Deserializer. This is due to the fact that the end timestamp is not present in the message key. The end timestamp is not currently added to the end of the key for Time Window aggregates. Update the code to add the end timestamp long onto the end of the key and update the Deserialized to extract and set the correct end on the Window. This issue is peripherally related to KAFKA-4468, KAFKA-9649 and KAFKA-10366. > Add end timestamp to Time Window key. Update TimeWindow > Serializer/Deserializer to set end of window. > -- > > Key: KAFKA-10376 > URL: https://issues.apache.org/jira/browse/KAFKA-10376 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.6.0 >Reporter: BPR >Priority: Major > Labels: Aggregation, Deserialize, Key > > Aggregate stream consumers are not currently able to get the end timestamp > for Time Windows from the Deserializer. This is due to the fact that the end > timestamp is not present in the message key. The end timestamp is not > currently added to the end of the key for Time Window aggregates. > Allowing the Consumer to set the window size in the Deserializer's > constructor is not sufficient as an objective consumer would not know what > window size was being used. It also stands to reason that the aggregator > could choose to change the window without notifying the consumer: e.g., 1 > minute windows during business hours and 1 hour off business hours. It would > also be possible for the same consumer to be consuming several topics at > once, each with a different window on each topic. > Update the code to add the end timestamp long onto the end of the key and > update the Deserializer to extract and set the correct end on the Window. > This issue is peripherally related to KAFKA-4468, KAFKA-9649 and KAFKA-10366. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10377) Delete Useless Code
Bingkun.ji created KAFKA-10377: -- Summary: Delete Useless Code Key: KAFKA-10377 URL: https://issues.apache.org/jira/browse/KAFKA-10377 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 2.6.0 Reporter: Bingkun.ji Attachments: image-2020-08-10-00-13-28-744.png delete useless code for client !image-2020-08-10-00-13-28-744.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lbradstreet commented on a change in pull request #9147: MINOR: supervise TransactionalMessageCopier producer
lbradstreet commented on a change in pull request #9147: URL: https://github.com/apache/kafka/pull/9147#discussion_r467590975 ## File path: tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java ## @@ -345,39 +357,55 @@ public void onPartitionsAssigned(Collection partitions) { ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); if (records.count() > 0) { try { -producer.beginTransaction(); - -for (ConsumerRecord record : records) { - producer.send(producerRecordFromConsumerRecord(outputTopic, record)); -} - -long messagesSentWithinCurrentTxn = records.count(); - -if (useGroupMetadata) { - producer.sendOffsetsToTransaction(consumerPositions(consumer), consumer.groupMetadata()); -} else { - producer.sendOffsetsToTransaction(consumerPositions(consumer), consumerGroup); -} - -if (enableRandomAborts && random.nextInt() % 3 == 0) { -throw new KafkaException("Aborting transaction"); -} else { -producer.commitTransaction(); - remainingMessages.getAndAdd(-messagesSentWithinCurrentTxn); - numMessagesProcessedSinceLastRebalance.getAndAdd(messagesSentWithinCurrentTxn); - totalMessageProcessed.getAndAdd(messagesSentWithinCurrentTxn); +try { +producer.get().beginTransaction(); + +for (ConsumerRecord record : records) { + producer.get().send(producerRecordFromConsumerRecord(outputTopic, record)); +} + +long messagesSentWithinCurrentTxn = records.count(); + +if (useGroupMetadata) { + producer.get().sendOffsetsToTransaction(consumerPositions(consumer), +consumer.groupMetadata()); +} else { + producer.get().sendOffsetsToTransaction(consumerPositions(consumer), +consumerGroup); +} + +if (enableRandomAborts && random.nextInt() % 3 == 0) { +throw new KafkaException("Aborting transaction"); +} else { +producer.get().commitTransaction(); + remainingMessages.getAndAdd(-messagesSentWithinCurrentTxn); + numMessagesProcessedSinceLastRebalance.getAndAdd(messagesSentWithinCurrentTxn); + totalMessageProcessed.getAndAdd(messagesSentWithinCurrentTxn); +} +} catch (ProducerFencedException | OutOfOrderSequenceException e) { +// handle these exception in the outer exception handling +throw e; +} catch (KafkaException e) { +// this may throw a ProducerFencedException on recovery +// this will handled in the outer catch if necessary + System.out.println(handledExceptionJson(totalMessageProcessed.get(), + numMessagesProcessedSinceLastRebalance.get(), remainingMessages.get(), transactionalId, e)); +producer.get().abortTransaction(); +resetToLastCommittedPositions(consumer); } } catch (ProducerFencedException | OutOfOrderSequenceException e) { -// We cannot recover from these errors, so just rethrow them and let the process fail -throw e; -} catch (KafkaException e) { -producer.abortTransaction(); +// These failures are not recoverable with the same producer Review comment: I think this is a reasonable way to deal with these issues as the main thing we are trying to test here is that we produce an exact copy from the input topic to output topic, however I do have questions about what kind of exceptions we should allow for. Originally my change allowed for supervision of ProducerFencedException(s) but not OutOfOrderSequenceException(s). If we do not expect those in any cases where we are messing with cluster behavior (broker bounces, network partitioning, etc) then I can take the handling out for the OoOS case. -
[jira] [Assigned] (KAFKA-8812) Rebalance Producers
[ https://issues.apache.org/jira/browse/KAFKA-8812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Werner Daehn reassigned KAFKA-8812: --- Assignee: Matthias J. Sax (was: Werner Daehn) > Rebalance Producers > --- > > Key: KAFKA-8812 > URL: https://issues.apache.org/jira/browse/KAFKA-8812 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.3.0 >Reporter: Werner Daehn >Assignee: Matthias J. Sax >Priority: Major > Labels: kip > > [KIP-509: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-509%3A+Rebalance+and+restart+Producers|https://cwiki.apache.org/confluence/display/KAFKA/KIP-509%3A+Rebalance+and+restart+Producers] > Please bare with me. Initially this thought sounds stupid but it has its > merits. > > How do you build a distributed producer at the moment? You use Kafka Connect > which in turn requires a cluster that tells which instance is producing what > partitions. > On the consumer side it is different. There Kafka itself does the data > distribution. If you have 10 Kafka partitions and 10 consumers, each will get > data for one partition. With 5 consumers, each will get data from two > partitions. And if there is only a single consumer active, it gets all data. > All is managed by Kafka, all you have to do is start as many consumers as you > want. > > I'd like to suggest something similar for the producers. A producer would > tell Kafka that its source has 10 partitions. The Kafka server then responds > with a list of partitions this instance shall be responsible for. If it is > the only producer, the response would be all 10 partitions. If it is the > second instance starting up, the first instance would get the information it > should produce data for partition 1-5 and the new one for partition 6-10. If > the producer fails to respond with an alive packet, a rebalance does happen, > informing the active producer to take more load and the dead producer will > get an error when sending data again. > For restart, the producer rebalance has to send the starting point where to > start producing the data onwards from as well, of course. Would be best if > this is a user generated pointer and not the topic offset. Then it can be > e.g. the database system change number, a database transaction id or > something similar. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lbradstreet opened a new pull request #9147: MINOR: supervise TransactionalMessageCopier producer
lbradstreet opened a new pull request #9147: URL: https://github.com/apache/kafka/pull/9147 transactions_test and group_mode_transactions_test have proven quite brittle around timeouts (see 67f5b5de77d67c02edb335737215312d099a1cac, e099b58df5b3e4f87173fc55880f9c343308739f, d9fe30dab0fc56318b012731c348ed1ddae2ec04, 07db26c20fcbccbf758591607864f7fd4bd8975f). This fights a losing battle, especially if we want to increase the types of nemesis checks that we want to perform on transaction support (e.g. iptables based partitioning). This PR creates a new producer when the prior one is in an unrecoverable state alllowing us to still test the EOS invariants required. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9752) Consumer rebalance can be stuck after new member timeout with old JoinGroup version
[ https://issues.apache.org/jira/browse/KAFKA-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173850#comment-17173850 ] Dibyendu Bhattacharya commented on KAFKA-9752: -- Hi [~ijuma] [~hachikuji], I am seeing a different issue now with this fix . Earlier the ConsumerGroup was stuck in "PendingRebalance" state , which is not happening now , but now I see members not able to join the group . I see below logs where members are being removed after session timeout. [2020-08-09 09:29:00,558] INFO [GroupCoordinator 5]: *Pending member* XXX in group YYY *has been removed after session timeout expiration*. (kafka.coordinator.group.GroupCoordinator) [2020-08-09 09:29:55,856] INFO [GroupCoordinator 5]: *Pending member* ZZZ in group YYY *has been removed after session timeout expiration*. (kafka.coordinator.group.GroupCoordinator) As I see the GroupCoridinator code, when new member tries to join for first time, GroupCoridinator also schedule a addPendingMemberExpiration (in doUnknownJoinGroup call ) with SessionTimeOut… {code:} addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs) {code:} If for some reason , if addMemberAndRebalance call takes longer, and member still in “Pending” state, the above addPendingMemberExpiration can remove the pending member and they cannot join the group. I think that is what is happening. When for new member , Coordinator is already setting a timeout in {code:} completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs) {code:} What the requirement for one more addPendingMemberExpiration task ? > Consumer rebalance can be stuck after new member timeout with old JoinGroup > version > --- > > Key: KAFKA-9752 > URL: https://issues.apache.org/jira/browse/KAFKA-9752 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.2, 2.3.1, 2.4.1 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.2 > > > For older versions of the JoinGroup protocol (v3 and below), there was no way > for new consumer group members to get their memberId until the first > rebalance completed. If the JoinGroup request timed out and the client > disconnected, the member would nevertheless be left in the group until the > rebalance completed and the session timeout expired. > In order to prevent this situation from causing the group size to grow > unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new > member will be left in the group before it would be kicked out (in spite of > rebalance state). > In KAFKA-9232, we addressed one issue with this solution. Basically the new > member expiration logic did not properly get cancelled after the rebalance > completed which means that in certain cases, a successfully joined member > might get kicked out of the group unnecessarily. > Unfortunately, this patch introduced a regression in the normal session > expiration logic following completion of the initial rebalance. Basically the > expiration task fails to get scheduled properly. The issue is in this > function: > {code} > def shouldKeepAlive(deadlineMs: Long): Boolean = { > if (isNew) { > // New members are expired after the static join timeout > latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs > } else if (isAwaitingJoin || isAwaitingSync) { > // Don't remove members as long as they have a request in purgatory > true > } else { > // Otherwise check for session expiration > latestHeartbeat + sessionTimeoutMs > deadlineMs > } > } > {code} > We use this logic in order to check for session expiration. On the surface, > there is nothing wrong with it, but it has an odd interaction with the > purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, > the code relies on `shouldKeepAlive` returning false so that the heartbeat > task is not immediately completed. This only works because we update > `latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means > that the first or third checks will fail, `shouldKeepAlive` will return > false, and the heartbeat expiration task will not be immediately completed. > The bug in this case has to do with the case when `isNew` is true. When we > schedule the session expiration task, the `isNew` flag is still set to true, > which means we will hit the first check above. Since in most cases, the > session timeout is less than the new member timeout of 5 minutes, the check > is very likely to return true. This seems like what we would want, but as > noted above, we rely on this function returning false when the expiration > task
[jira] [Updated] (KAFKA-10376) Add end timestamp to Time Window key. Update TimeWindow Serializer/Deserializer to set end of window.
[ https://issues.apache.org/jira/browse/KAFKA-10376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BPR updated KAFKA-10376: Labels: Aggregation Deserialize Key (was: Deserialize) > Add end timestamp to Time Window key. Update TimeWindow > Serializer/Deserializer to set end of window. > -- > > Key: KAFKA-10376 > URL: https://issues.apache.org/jira/browse/KAFKA-10376 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.6.0 >Reporter: BPR >Priority: Major > Labels: Aggregation, Deserialize, Key > > Aggregate stream consumers are not currently able to get the end timestamp > for Time Windows from the Deserializer. This is due to the fact that the end > timestamp is not present in the message key. The end timestamp is not > currently added to the end of the key for Time Window aggregates. > Update the code to add the end timestamp long onto the end of the key and > update the Deserialized to extract and set the correct end on the Window. > This issue is peripherally related to KAFKA-4468, KAFKA-9649 and KAFKA-10366. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10376) Add end timestamp to Time Window key. Update TimeWindow Serializer/Deserializer to set end of window.
[ https://issues.apache.org/jira/browse/KAFKA-10376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BPR updated KAFKA-10376: Description: Aggregate stream consumers are not currently able to get the end timestamp for Time Windows from the Deserializer. This is due to the fact that the end timestamp is not present in the message key. The end timestamp is not currently added to the end of the key for Time Window aggregates. Update the code to add the end timestamp long onto the end of the key and update the Deserialized to extract and set the correct end on the Window. This issue is peripherally related to KAFKA-4468, KAFKA-9649 and KAFKA-10366. was: The end timestamp is not currently added to the end of the key for Time Window aggregates. Update the code to add the end timestamp long onto the end of the key and update the Deserialized to extract and set the correct end on the Window. This issue is peripherally related to KAFKA-4468, KAFKA-9649 and KAFKA-10366. > Add end timestamp to Time Window key. Update TimeWindow > Serializer/Deserializer to set end of window. > -- > > Key: KAFKA-10376 > URL: https://issues.apache.org/jira/browse/KAFKA-10376 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.6.0 >Reporter: BPR >Priority: Major > Labels: Deserialize > > Aggregate stream consumers are not currently able to get the end timestamp > for Time Windows from the Deserializer. This is due to the fact that the end > timestamp is not present in the message key. The end timestamp is not > currently added to the end of the key for Time Window aggregates. > Update the code to add the end timestamp long onto the end of the key and > update the Deserialized to extract and set the correct end on the Window. > This issue is peripherally related to KAFKA-4468, KAFKA-9649 and KAFKA-10366. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10376) Add end timestamp to Time Window key. Update TimeWindow Serializer/Deserializer to set end of window.
[ https://issues.apache.org/jira/browse/KAFKA-10376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BPR updated KAFKA-10376: Description: The end timestamp is not currently added to the end of the key for Time Window aggregates. Update the code to add the end timestamp long onto the end of the key and update the Deserialized to extract and set the correct end on the Window. This issue is peripherally related to KAFKA-4468, KAFKA-9649 and KAFKA-10366. was: When a _Consumer_ is consuming messages containing Time Windowed aggregates, it has no way of obtaining the _end_ timestamps of the window contained in the key; instead, it must specify the window size (which it likely does not know) or receive end timestamps set to Long.MAX_VALUE. -- thus, rendering the window essentially unbound. An objective Consumer should not be expected to know the window size of the aggregate messages it is consuming. This is especially true if it is consuming several different topics with windows off varying size. This issue is peripherally related to KAFKA-4468, KAFKA-9649 and KAFKA-10366. Issue Type: Improvement (was: Bug) Summary: Add end timestamp to Time Window key. Update TimeWindow Serializer/Deserializer to set end of window. (was: TimeWindowedDeserializer does not deserialize the end timestamp of time window aggregate message keys.) > Add end timestamp to Time Window key. Update TimeWindow > Serializer/Deserializer to set end of window. > -- > > Key: KAFKA-10376 > URL: https://issues.apache.org/jira/browse/KAFKA-10376 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.6.0 >Reporter: BPR >Priority: Major > Labels: Deserialize > > The end timestamp is not currently added to the end of the key for Time > Window aggregates. Update the code to add the end timestamp long onto the end > of the key and update the Deserialized to extract and set the correct end on > the Window. > This issue is peripherally related to KAFKA-4468, KAFKA-9649 and KAFKA-10366. -- This message was sent by Atlassian Jira (v8.3.4#803005)