[jira] [Comment Edited] (KAFKA-6690) Priorities for Source Topics
[ https://issues.apache.org/jira/browse/KAFKA-6690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659806#comment-16659806 ] Bala Prassanna I edited comment on KAFKA-6690 at 10/23/18 3:31 AM: --- [~nafshartous] Sorry about the delay in reply. We use Kafka to process the asynchronous events of our Document Management System such as preview generation, indexing for search etc. The traffic gets generated via Web and Desktop Sync application. In such cases, we had to prioritize the traffic from web and consume them first. But this might lead to the starvation of events from sync if the consumer speed is slow and the event rate is high from web. A solution to handle the starvation with a timeout after which the events are consumed normally for a specified period of time would be great and help us use our resources effectively. was (Author: balaprassanna): [~nafshartous] Sorry about the delay in reply. We use Kafka to process the aynchronous events of our Document Management System such as preview generation, indexing for search etc. The traffic gets generated via Web and Desktop Sync application. In such cases, we had to prioritise the traffic from web and consume them first. Though this may lead to starvation of events from sync, an API as such will help us effectively manage the available resources and prioritise our traffic without allocating separate server to handle the sync load. > Priorities for Source Topics > > > Key: KAFKA-6690 > URL: https://issues.apache.org/jira/browse/KAFKA-6690 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: Bala Prassanna I >Assignee: Nick Afshartous >Priority: Major > > We often encounter use cases where we need to prioritise source topics. If a > consumer is listening more than one topic, say, HighPriorityTopic and > LowPriorityTopic, it should consume events from LowPriorityTopic only when > all the events from HighPriorityTopic are consumed. This is needed in Kafka > Streams processor topologies as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659935#comment-16659935 ] Ewen Cheslack-Postava commented on KAFKA-7481: -- [~ijuma] As it stands today, is the only use case for inter.broker.protocol.version to hold back the version during rolling upgrade? Do we even really need this with KIP-35? It sounds like aside from the two rolling bounce upgrade, you expect people wouldn't generally be using this today (given you say the benefit is minimal)? I'm fine overloading one of them if we think the existing use case for it is either ok to compromise with additional restrictions OR tradeoffs aren't weird like they are for log.message.format.version. Test surface area is a fair point, wrt complexity I guess I don't see that much of a difference for users since they just copy/paste 2 lines rather than 1 and otherwise probably blindly follow the directions. > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 2.1.0 > > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7299) batch LeaderAndIsr requests during auto preferred leader election
[ https://issues.apache.org/jira/browse/KAFKA-7299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-7299: --- Fix Version/s: 2.0.1 1.1.2 Kevin, since this is a small patch, I merged it to both 1.1 and 2.0 branch. > batch LeaderAndIsr requests during auto preferred leader election > - > > Key: KAFKA-7299 > URL: https://issues.apache.org/jira/browse/KAFKA-7299 > Project: Kafka > Issue Type: Sub-task > Components: core >Affects Versions: 2.0.0 >Reporter: Jun Rao >Assignee: huxihx >Priority: Major > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Currently, in KafkaController.checkAndTriggerAutoLeaderRebalance(), we call > onPreferredReplicaElection() one partition at a time. This means that the > controller will be sending LeaderAndIsrRequest one partition at a time. It > would be more efficient to call onPreferredReplicaElection() for a batch of > partitions to reduce the number of LeaderAndIsrRequests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error
[ https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659857#comment-16659857 ] Matthias J. Sax commented on KAFKA-7510: Thanks [~MrKafka]. Works for me. If you are quick, we can try to get this into 2.0.1 bug fix release. > KStreams RecordCollectorImpl leaks data to logs on error > > > Key: KAFKA-7510 > URL: https://issues.apache.org/jira/browse/KAFKA-7510 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mr Kafka >Priority: Major > Labels: user-experience > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data > on error as it dumps the *value* / message payload to the logs. > This is problematic as it may contain personally identifiable information > (pii) or other secret information to plain text log files which can then be > propagated to other log systems i.e Splunk. > I suggest the *key*, and *value* fields be moved to debug level as it is > useful for some people while error level contains the *errorMessage, > timestamp, topic* and *stackTrace*. > {code:java} > private void recordSendError( > final K key, > final V value, > final Long timestamp, > final String topic, > final Exception exception > ) { > String errorLogMessage = LOG_MESSAGE; > String errorMessage = EXCEPTION_MESSAGE; > if (exception instanceof RetriableException) { > errorLogMessage += PARAMETER_HINT; > errorMessage += PARAMETER_HINT; > } > log.error(errorLogMessage, key, value, timestamp, topic, > exception.toString()); > sendException = new StreamsException( > String.format( > errorMessage, > logPrefix, > "an error caught", > key, > value, > timestamp, > topic, > exception.toString() > ), > exception); > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config
[ https://issues.apache.org/jira/browse/KAFKA-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659824#comment-16659824 ] Sophie Qian edited comment on KAFKA-7530 at 10/22/18 11:41 PM: --- Our problem is with runnning Confluent Schema Registry in docker container. I got following warning msg after export {color:#14892c}SCHEMA_REGISTRY_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG="" {color} {color:#d04437}[2018-10-22 17:58:10,058] WARN The configuration 'ssl.endpoint.identification.algorithm.config' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig){color} {color:#d04437}{color:#33}The value for ssl.endpoint.identification.algorithm.config in /etc/schema-registry/schema-registry.properties is not been taken into consideration. it's empty string in the properties file{color} {color:#14892c}( {color}{color}{color:#14892c}ssl.endpoint.identification.algorithm.config= ){color} Schema registry log showing ssl.endpoint.identification.algorithm is using "https" [2018-10-22 23:26:43,039] INFO AdminClientConfig values: .. ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] {color:#d04437}ssl.endpoint.identification.algorithm = https{color} {color:#d04437} {color} was (Author: kafka): Our problem is with runnning Confluent Schema Registry in docker container. I got following warning msg after export {color:#14892c}SCHEMA_REGISTRY_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG="" {color} {color:#d04437}[2018-10-22 17:58:10,058] WARN The configuration 'ssl.endpoint.identification.algorithm.config' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig){color} {color:#d04437}{color:#33}The value for ssl.endpoint.identification.algorithm.config in /etc/schema-registry/schema-registry.properties is not been taken into consideration. it's empty string in the properties file ( {color}{color}{color:#14892c}ssl.endpoint.identification.algorithm.config={color} ) Schema registry log showing ssl.endpoint.identification.algorithm is using "https" [2018-10-22 23:26:43,039] INFO AdminClientConfig values: .. ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] {color:#d04437}ssl.endpoint.identification.algorithm = https{color} {color:#d04437} {color} > Need to allow overwrite ssl.endpoint.identification.algorithm.config > > > Key: KAFKA-7530 > URL: https://issues.apache.org/jira/browse/KAFKA-7530 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.0 >Reporter: Sophie Qian >Priority: Major > > We are in the process of upgrading our system to use Confluent 5.0.0 (which > is using Kafka 2.0.0). I found out SslConfigs ( > clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java) has > following change: > > {color:#14892c}KAFKA-3665: Enable TLS hostname verification by default > (KIP-294) (#4956){color} > {color:#14892c}Make HTTPS the default > ssl.endpoint.identification.algorithm.{color} > > But user can not overwrite ssl.endpoint.identification.alogorithm, only > following values can be reconfigurable. > {color:#205081}public static final Set RECONFIGURABLE_CONFIGS = > Utils.mkSet( > ¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, > ¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, > ¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, > ¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color} > > Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG > reconfigurable. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config
[ https://issues.apache.org/jira/browse/KAFKA-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659824#comment-16659824 ] Sophie Qian commented on KAFKA-7530: Our problem is with runnning Confluent Schema Registry in docker container. I got following warning msg after export {color:#14892c}SCHEMA_REGISTRY_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG="" {color} {color:#d04437}[2018-10-22 17:58:10,058] WARN The configuration 'ssl.endpoint.identification.algorithm.config' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig){color} {color:#d04437}{color:#33}The value for ssl.endpoint.identification.algorithm.config in /etc/schema-registry/schema-registry.properties is not been taken into consideration. it's empty string in the properties file ( {color}{color}{color:#14892c}ssl.endpoint.identification.algorithm.config={color} ) Schema registry log showing ssl.endpoint.identification.algorithm is using "https" [2018-10-22 23:26:43,039] INFO AdminClientConfig values: .. ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] {color:#d04437}ssl.endpoint.identification.algorithm = https{color} {color:#d04437} {color} > Need to allow overwrite ssl.endpoint.identification.algorithm.config > > > Key: KAFKA-7530 > URL: https://issues.apache.org/jira/browse/KAFKA-7530 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.0 >Reporter: Sophie Qian >Priority: Major > > We are in the process of upgrading our system to use Confluent 5.0.0 (which > is using Kafka 2.0.0). I found out SslConfigs ( > clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java) has > following change: > > {color:#14892c}KAFKA-3665: Enable TLS hostname verification by default > (KIP-294) (#4956){color} > {color:#14892c}Make HTTPS the default > ssl.endpoint.identification.algorithm.{color} > > But user can not overwrite ssl.endpoint.identification.alogorithm, only > following values can be reconfigurable. > {color:#205081}public static final Set RECONFIGURABLE_CONFIGS = > Utils.mkSet( > ¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, > ¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, > ¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, > ¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color} > > Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG > reconfigurable. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6690) Priorities for Source Topics
[ https://issues.apache.org/jira/browse/KAFKA-6690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659806#comment-16659806 ] Bala Prassanna I commented on KAFKA-6690: - [~nafshartous] Sorry about the delay in reply. We use Kafka to process the aynchronous events of our Document Management System such as preview generation, indexing for search etc. The traffic gets generated via Web and Desktop Sync application. In such cases, we had to prioritise the traffic from web and consume them first. Though this may lead to starvation of events from sync, an API as such will help us effectively manage the available resources and prioritise our traffic without allocating separate server to handle the sync load. > Priorities for Source Topics > > > Key: KAFKA-6690 > URL: https://issues.apache.org/jira/browse/KAFKA-6690 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: Bala Prassanna I >Assignee: Nick Afshartous >Priority: Major > > We often encounter use cases where we need to prioritise source topics. If a > consumer is listening more than one topic, say, HighPriorityTopic and > LowPriorityTopic, it should consume events from LowPriorityTopic only when > all the events from HighPriorityTopic are consumed. This is needed in Kafka > Streams processor topologies as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config
[ https://issues.apache.org/jira/browse/KAFKA-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659778#comment-16659778 ] Ismael Juma commented on KAFKA-7530: Via server.properties. > Need to allow overwrite ssl.endpoint.identification.algorithm.config > > > Key: KAFKA-7530 > URL: https://issues.apache.org/jira/browse/KAFKA-7530 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.0 >Reporter: Sophie Qian >Priority: Major > > We are in the process of upgrading our system to use Confluent 5.0.0 (which > is using Kafka 2.0.0). I found out SslConfigs ( > clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java) has > following change: > > {color:#14892c}KAFKA-3665: Enable TLS hostname verification by default > (KIP-294) (#4956){color} > {color:#14892c}Make HTTPS the default > ssl.endpoint.identification.algorithm.{color} > > But user can not overwrite ssl.endpoint.identification.alogorithm, only > following values can be reconfigurable. > {color:#205081}public static final Set RECONFIGURABLE_CONFIGS = > Utils.mkSet( > ¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, > ¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, > ¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, > ¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color} > > Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG > reconfigurable. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config
[ https://issues.apache.org/jira/browse/KAFKA-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659777#comment-16659777 ] Sophie Qian commented on KAFKA-7530: [~ijuma] How can we override this config ? Can you provide some details? Thanks > Need to allow overwrite ssl.endpoint.identification.algorithm.config > > > Key: KAFKA-7530 > URL: https://issues.apache.org/jira/browse/KAFKA-7530 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.0 >Reporter: Sophie Qian >Priority: Major > > We are in the process of upgrading our system to use Confluent 5.0.0 (which > is using Kafka 2.0.0). I found out SslConfigs ( > clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java) has > following change: > > {color:#14892c}KAFKA-3665: Enable TLS hostname verification by default > (KIP-294) (#4956){color} > {color:#14892c}Make HTTPS the default > ssl.endpoint.identification.algorithm.{color} > > But user can not overwrite ssl.endpoint.identification.alogorithm, only > following values can be reconfigurable. > {color:#205081}public static final Set RECONFIGURABLE_CONFIGS = > Utils.mkSet( > ¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, > ¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, > ¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, > ¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color} > > Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG > reconfigurable. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config
[ https://issues.apache.org/jira/browse/KAFKA-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659772#comment-16659772 ] Ismael Juma commented on KAFKA-7530: To be clear, you can override the config, you just can't do it dynamically. > Need to allow overwrite ssl.endpoint.identification.algorithm.config > > > Key: KAFKA-7530 > URL: https://issues.apache.org/jira/browse/KAFKA-7530 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.0 >Reporter: Sophie Qian >Priority: Major > > We are in the process of upgrading our system to use Confluent 5.0.0 (which > is using Kafka 2.0.0). I found out SslConfigs ( > clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java) has > following change: > > {color:#14892c}KAFKA-3665: Enable TLS hostname verification by default > (KIP-294) (#4956){color} > {color:#14892c}Make HTTPS the default > ssl.endpoint.identification.algorithm.{color} > > But user can not overwrite ssl.endpoint.identification.alogorithm, only > following values can be reconfigurable. > {color:#205081}public static final Set RECONFIGURABLE_CONFIGS = > Utils.mkSet( > ¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, > ¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, > ¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, > ¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color} > > Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG > reconfigurable. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config
[ https://issues.apache.org/jira/browse/KAFKA-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reassigned KAFKA-7530: -- Assignee: Ismael Juma > Need to allow overwrite ssl.endpoint.identification.algorithm.config > > > Key: KAFKA-7530 > URL: https://issues.apache.org/jira/browse/KAFKA-7530 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.0 >Reporter: Sophie Qian >Assignee: Ismael Juma >Priority: Major > > We are in the process of upgrading our system to use Confluent 5.0.0 (which > is using Kafka 2.0.0). I found out SslConfigs ( > clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java) has > following change: > > {color:#14892c}KAFKA-3665: Enable TLS hostname verification by default > (KIP-294) (#4956){color} > {color:#14892c}Make HTTPS the default > ssl.endpoint.identification.algorithm.{color} > > But user can not overwrite ssl.endpoint.identification.alogorithm, only > following values can be reconfigurable. > {color:#205081}public static final Set RECONFIGURABLE_CONFIGS = > Utils.mkSet( > ¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, > ¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, > ¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, > ¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color} > > Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG > reconfigurable. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config
[ https://issues.apache.org/jira/browse/KAFKA-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reassigned KAFKA-7530: -- Assignee: (was: Ismael Juma) > Need to allow overwrite ssl.endpoint.identification.algorithm.config > > > Key: KAFKA-7530 > URL: https://issues.apache.org/jira/browse/KAFKA-7530 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.0 >Reporter: Sophie Qian >Priority: Major > > We are in the process of upgrading our system to use Confluent 5.0.0 (which > is using Kafka 2.0.0). I found out SslConfigs ( > clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java) has > following change: > > {color:#14892c}KAFKA-3665: Enable TLS hostname verification by default > (KIP-294) (#4956){color} > {color:#14892c}Make HTTPS the default > ssl.endpoint.identification.algorithm.{color} > > But user can not overwrite ssl.endpoint.identification.alogorithm, only > following values can be reconfigurable. > {color:#205081}public static final Set RECONFIGURABLE_CONFIGS = > Utils.mkSet( > ¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, > ¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, > ¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, > ¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, > ¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color} > > Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG > reconfigurable. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error
[ https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659769#comment-16659769 ] Mr Kafka edited comment on KAFKA-7510 at 10/22/18 10:33 PM: [~mjsax] {quote}Why does it not contradict KAFKA-6538? It is about adding key/value in human readable form to exception messages that end up in the logs. While this ticket is about removing key/value data from the logs. What do you mean by "it only affects it's implementation"? {quote} You can still add key/vaue/headers or as much useful data as you want to the log messages, those log messages will be under TRACE level and a user has to explicitly turn TRACE level on to see the extra information. Happy to create a PR to move out key/value logging from RecordCollectorImpl to trace as it directly blocks me and I'm likely to do this in a private fork regardless until it is in an official release. Not so interested on creating a KIP and long discussions on moving sensitive log data to a different log level which is as simple as adding a *log.trace(...)* below the *log.error(...)* statements. was (Author: mrkafka): [~mjsax] {quote} Why does it not contradict KAFKA-6538? It is about adding key/value in human readable form to exception messages that end up in the logs. While this ticket is about removing key/value data from the logs. What do you mean by "it only affects it's implementation"? {quote} You can still add key/vaue/headers or as much useful data as you want to the log messages, those log messages will be under TRACE level and a user has to explicitly turn TRACE level on to see the extra information. Happy to create a PR to move out key/value logging from RecordCollectorImpl to trace as it directly blocks me and I'm likely to do this in a private fork regardless until it is done. Not so interested on creating a KIP and long discussions on moving sensitive log data to a different log level which is as simple as adding a *log.trace(...)* below the *log.error(...)* statements. > KStreams RecordCollectorImpl leaks data to logs on error > > > Key: KAFKA-7510 > URL: https://issues.apache.org/jira/browse/KAFKA-7510 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mr Kafka >Priority: Major > Labels: user-experience > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data > on error as it dumps the *value* / message payload to the logs. > This is problematic as it may contain personally identifiable information > (pii) or other secret information to plain text log files which can then be > propagated to other log systems i.e Splunk. > I suggest the *key*, and *value* fields be moved to debug level as it is > useful for some people while error level contains the *errorMessage, > timestamp, topic* and *stackTrace*. > {code:java} > private void recordSendError( > final K key, > final V value, > final Long timestamp, > final String topic, > final Exception exception > ) { > String errorLogMessage = LOG_MESSAGE; > String errorMessage = EXCEPTION_MESSAGE; > if (exception instanceof RetriableException) { > errorLogMessage += PARAMETER_HINT; > errorMessage += PARAMETER_HINT; > } > log.error(errorLogMessage, key, value, timestamp, topic, > exception.toString()); > sendException = new StreamsException( > String.format( > errorMessage, > logPrefix, > "an error caught", > key, > value, > timestamp, > topic, > exception.toString() > ), > exception); > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error
[ https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659769#comment-16659769 ] Mr Kafka commented on KAFKA-7510: - [~mjsax] {quote} Why does it not contradict KAFKA-6538? It is about adding key/value in human readable form to exception messages that end up in the logs. While this ticket is about removing key/value data from the logs. What do you mean by "it only affects it's implementation"? {quote} You can still add key/vaue/headers or as much useful data as you want to the log messages, those log messages will be under TRACE level and a user has to explicitly turn TRACE level on to see the extra information. Happy to create a PR to move out key/value logging from RecordCollectorImpl to trace as it directly blocks me and I'm likely to do this in a private fork regardless until it is done. Not so interested on creating a KIP and long discussions on moving sensitive log data to a different log level which is as simple as adding a *log.trace(...)* below the *log.error(...)* statements. > KStreams RecordCollectorImpl leaks data to logs on error > > > Key: KAFKA-7510 > URL: https://issues.apache.org/jira/browse/KAFKA-7510 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mr Kafka >Priority: Major > Labels: user-experience > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data > on error as it dumps the *value* / message payload to the logs. > This is problematic as it may contain personally identifiable information > (pii) or other secret information to plain text log files which can then be > propagated to other log systems i.e Splunk. > I suggest the *key*, and *value* fields be moved to debug level as it is > useful for some people while error level contains the *errorMessage, > timestamp, topic* and *stackTrace*. > {code:java} > private void recordSendError( > final K key, > final V value, > final Long timestamp, > final String topic, > final Exception exception > ) { > String errorLogMessage = LOG_MESSAGE; > String errorMessage = EXCEPTION_MESSAGE; > if (exception instanceof RetriableException) { > errorLogMessage += PARAMETER_HINT; > errorMessage += PARAMETER_HINT; > } > log.error(errorLogMessage, key, value, timestamp, topic, > exception.toString()); > sendException = new StreamsException( > String.format( > errorMessage, > logPrefix, > "an error caught", > key, > value, > timestamp, > topic, > exception.toString() > ), > exception); > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config
Sophie Qian created KAFKA-7530: -- Summary: Need to allow overwrite ssl.endpoint.identification.algorithm.config Key: KAFKA-7530 URL: https://issues.apache.org/jira/browse/KAFKA-7530 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.0.0 Reporter: Sophie Qian We are in the process of upgrading our system to use Confluent 5.0.0 (which is using Kafka 2.0.0). I found out SslConfigs ( clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java) has following change: {color:#14892c}KAFKA-3665: Enable TLS hostname verification by default (KIP-294) (#4956){color} {color:#14892c}Make HTTPS the default ssl.endpoint.identification.algorithm.{color} But user can not overwrite ssl.endpoint.identification.alogorithm, only following values can be reconfigurable. {color:#205081}public static final Set RECONFIGURABLE_CONFIGS = Utils.mkSet( ¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG, ¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color} Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG reconfigurable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6690) Priorities for Source Topics
[ https://issues.apache.org/jira/browse/KAFKA-6690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659562#comment-16659562 ] Nick Afshartous commented on KAFKA-6690: [~balaprassanna] Just checking again if you can help with elaborating on your use-cases ? This would make it easier to facilitate a vote on this feature request. > Priorities for Source Topics > > > Key: KAFKA-6690 > URL: https://issues.apache.org/jira/browse/KAFKA-6690 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: Bala Prassanna I >Assignee: Nick Afshartous >Priority: Major > > We often encounter use cases where we need to prioritise source topics. If a > consumer is listening more than one topic, say, HighPriorityTopic and > LowPriorityTopic, it should consume events from LowPriorityTopic only when > all the events from HighPriorityTopic are consumed. This is needed in Kafka > Streams processor topologies as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error
[ https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659432#comment-16659432 ] Matthias J. Sax commented on KAFKA-7510: Why does it not contradict KAFKA-6538? It is about adding key/value in human readable form to exception messages that end up in the logs. While this ticket is about removing key/value data from the logs. What do you mean by "it only affects it's implementation"? For all the other points: I think discussion this on the ticket is not the right place. Would you like to drive a discussion [~MrKafka]? If yes, please send an "[DISCUSS]" email to dev-mailing list or maybe propose a KIP (cf. [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals)?|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals)] The goal should be to agree on a global strategy on how to handle this, and this should be documented in the Wiki IMHO to have a reference. For concrete violations of the strategy (eg, in RecordCollectorImpl) we can create individual ticket like this one to close the gaps. Thoughts? > KStreams RecordCollectorImpl leaks data to logs on error > > > Key: KAFKA-7510 > URL: https://issues.apache.org/jira/browse/KAFKA-7510 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mr Kafka >Priority: Major > Labels: user-experience > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data > on error as it dumps the *value* / message payload to the logs. > This is problematic as it may contain personally identifiable information > (pii) or other secret information to plain text log files which can then be > propagated to other log systems i.e Splunk. > I suggest the *key*, and *value* fields be moved to debug level as it is > useful for some people while error level contains the *errorMessage, > timestamp, topic* and *stackTrace*. > {code:java} > private void recordSendError( > final K key, > final V value, > final Long timestamp, > final String topic, > final Exception exception > ) { > String errorLogMessage = LOG_MESSAGE; > String errorMessage = EXCEPTION_MESSAGE; > if (exception instanceof RetriableException) { > errorLogMessage += PARAMETER_HINT; > errorMessage += PARAMETER_HINT; > } > log.error(errorLogMessage, key, value, timestamp, topic, > exception.toString()); > sendException = new StreamsException( > String.format( > errorMessage, > logPrefix, > "an error caught", > key, > value, > timestamp, > topic, > exception.toString() > ), > exception); > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable
[ https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659332#comment-16659332 ] ASF GitHub Bot commented on KAFKA-7519: --- ijuma closed pull request #5820: KAFKA-7519 Clear pending transaction state when expiration fails URL: https://github.com/apache/kafka/pull/5820 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index 574c64e2c65..589407c2a2c 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -166,37 +166,32 @@ class TransactionStateManager(brokerId: Int, (topicPartition, records) } - def removeFromCacheCallback(responses: collection.Map[TopicPartition, PartitionResponse]): Unit = { responses.foreach { case (topicPartition, response) => -response.error match { - case Errors.NONE => -inReadLock(stateLock) { - val toRemove = transactionalIdByPartition(topicPartition.partition()) - transactionMetadataCache.get(topicPartition.partition) -.foreach { txnMetadataCacheEntry => - toRemove.foreach { idCoordinatorEpochAndMetadata => -val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(idCoordinatorEpochAndMetadata.transactionalId) -txnMetadata.inLock { - if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch -&& txnMetadata.pendingState.contains(Dead) -&& txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch - ) - txnMetadataCacheEntry.metadataPerTransactionalId.remove(idCoordinatorEpochAndMetadata.transactionalId) - else { - debug(s"failed to remove expired transactionalId: ${idCoordinatorEpochAndMetadata.transactionalId}" + - s" from cache. pendingState: ${txnMetadata.pendingState} producerEpoch: ${txnMetadata.producerEpoch}" + - s" expected producerEpoch: ${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}" + - s" coordinatorEpoch: ${txnMetadataCacheEntry.coordinatorEpoch} expected coordinatorEpoch: " + - s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}") -txnMetadata.pendingState = None - } -} - } +inReadLock(stateLock) { + val toRemove = transactionalIdByPartition(topicPartition.partition) + transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry => +toRemove.foreach { idCoordinatorEpochAndMetadata => + val transactionalId = idCoordinatorEpochAndMetadata.transactionalId + val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId) + txnMetadata.inLock { +if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch + && txnMetadata.pendingState.contains(Dead) + && txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch + && response.error == Errors.NONE) { + txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId) +} else { + warn(s"Failed to remove expired transactionalId: $transactionalId" + +s" from cache. Tombstone append error code: ${response.error}," + +s" pendingState: ${txnMetadata.pendingState}, producerEpoch: ${txnMetadata.producerEpoch}," + +s" expected producerEpoch: ${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," + +s" coordinatorEpoch: ${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " + +s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}") + txnMetadata.pendingState = None } + } } - case _ => -
[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659227#comment-16659227 ] Ismael Juma commented on KAFKA-7481: The key insight for me is that the the benefit of having a separate config for the stateless parts is minimal and increasing the testing surface area for both upgrades and downgrades is not desirable (in addition to the addition complexity for the user). Yes, these things should ideally not be done in minor releases, but we are where we are. > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 2.1.0 > > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7529) Kafka Connect JDBC doesn't push new records to Kafka Topic unless the connector is restarted
Kashyap Ivaturi created KAFKA-7529: -- Summary: Kafka Connect JDBC doesn't push new records to Kafka Topic unless the connector is restarted Key: KAFKA-7529 URL: https://issues.apache.org/jira/browse/KAFKA-7529 Project: Kafka Issue Type: Task Components: KafkaConnect Reporter: Kashyap Ivaturi Hi, We have a Kafka Connect JDBC Source Connector which keeps polling for new records in a Oracle table every minute and push the new records to Kafka Topic. New records are determined by an incrementing column. In general everything works well but once in a while we see that even though there were new records with incrementing column those records doesn't get pushed to the Topic. There is no clue of any error in the logs and the connector is in running state. Only after we restart the Connector the new records are pushed to the Topic. Any idea in what situation can this happen?. Rgds Kashyap. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7528) Make Min and Max metrics' default value consistent with each other
Stanislav Kozlovski created KAFKA-7528: -- Summary: Make Min and Max metrics' default value consistent with each other Key: KAFKA-7528 URL: https://issues.apache.org/jira/browse/KAFKA-7528 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski Fix For: 2.2.0 KIP-386: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable
[ https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-7519: --- Fix Version/s: 2.0.1 > Transactional Ids Left in Pending State by TransactionStateManager During > Transactional Id Expiration Are Unusable > -- > > Key: KAFKA-7519 > URL: https://issues.apache.org/jira/browse/KAFKA-7519 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.0.0 >Reporter: Bridger Howell >Priority: Blocker > Fix For: 2.0.1, 2.1.0 > > Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png > > > > After digging into a case where an exactly-once streams process was bizarrely > unable to process incoming data, we observed the following: > * StreamThreads stalling while creating a producer, eventually resulting in > no consumption by that streams process. Looking into those threads, we found > they were stuck in a loop, sending InitProducerIdRequests and always > receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. > These requests always had the same transactional id. > * After changing the streams process to not use exactly-once, it was able to > process messages with no problems. > * Alternatively, changing the applicationId for that streams process, it was > able to process with no problems. > * Every hour, every broker would fail the task `transactionalId-expiration` > with the following error: > ** > {code:java} > {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing > transaction state transition to Dead while it already a pending sta > te Dead > at > kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262) > at kafka.coordinator > .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237) > at kafka.coordinator.transaction.TransactionStateManager$$a > nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal > a:151) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano > nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) > at kafka.coordinator.transaction.TransactionSt > ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc > ala:150) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a > nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149) > at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable > Like.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.Li > st.foreach(List.scala:392) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.Li > st.map(List.scala:296) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app > ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149) > at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl > eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142) > at scala.collection.Traversabl > eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike. > scala:241) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > at scala.collection.mutable.HashMap$$anon > fun$foreach$1.apply(HashMap.scala:130) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > at scala.collec > tion.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) > at scala.collecti > on.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > a > t > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr > ansactionStateManager.scala:142) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a >
[jira] [Commented] (KAFKA-7501) double deallocation of producer batch upon expiration of inflight requests and error response
[ https://issues.apache.org/jira/browse/KAFKA-7501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658791#comment-16658791 ] ASF GitHub Bot commented on KAFKA-7501: --- ijuma closed pull request #5807: KAFKA-7501: fix producer batch double deallocation when receiving message too large error on expired batch URL: https://github.com/apache/kafka/pull/5807 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 6e08185611d..0adbbf962cd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -72,9 +72,8 @@ private long lastAttemptMs; private long lastAppendTime; private long drainedMs; -private String expiryErrorMessage; private boolean retry; -private boolean reopened = false; +private boolean reopened; public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs) { this(tp, recordsBuilder, createdMs, false); @@ -156,6 +155,13 @@ public void abort(RuntimeException exception) { completeFutureAndFireCallbacks(ProduceResponse.INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, exception); } +/** + * Return `true` if {@link #done(long, long, RuntimeException)} has been invoked at least once, `false` otherwise. + */ +public boolean isDone() { +return finalState() != null; +} + /** * Finalize the state of a batch. Final state, once set, is immutable. This function may be called * once or twice on a batch. It may be called twice if diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index c50a85f06da..19d7af2e7a0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -184,9 +184,9 @@ public void maybeRemoveFromInflightBatches(ProducerBatch batch) { if (batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now)) { iter.remove(); // expireBatches is called in Sender.sendProducerData, before client.poll. -// The batch.finalState() == null invariant should always hold. An IllegalStateException +// The !batch.isDone() invariant should always hold. An IllegalStateException // exception will be thrown if the invariant is violated. -if (batch.finalState() == null) { +if (!batch.isDone()) { expiredBatches.add(batch); } else { throw new IllegalStateException(batch.topicPartition + " batch created at " + @@ -576,7 +576,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons long now, long throttleUntilTimeMs) { Errors error = response.error; -if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && +if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() && (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) { // If the batch is too large, we split the batch and send the split batches again. We do not decrement // the retry attempts in this case. @@ -726,7 +726,7 @@ private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime, private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response, long now) { return !batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now) && batch.attempts() < this.retries && -batch.finalState() == null && +!batch.isDone() && ((response.error.exception() instanceof RetriableException) || (transactionManager != null && transactionManager.canRetry(response, batch))); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 23ca2aeaed7..8a8ddd35836 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++
[jira] [Commented] (KAFKA-7525) Handling corrupt records
[ https://issues.apache.org/jira/browse/KAFKA-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658756#comment-16658756 ] Ismael Juma commented on KAFKA-7525: That IllegalStateException is a clear bug. Please test with a more recent version as it may have been fixed already. > Handling corrupt records > > > Key: KAFKA-7525 > URL: https://issues.apache.org/jira/browse/KAFKA-7525 > Project: Kafka > Issue Type: Improvement > Components: consumer, core >Affects Versions: 1.1.0 >Reporter: Katarzyna Solnica >Priority: Major > > When Java consumer encounters a corrupt record on a partition it reads from, > it throws: > {code:java} > org.apache.kafka.common.KafkaException: Received exception when fetching the > next record from XYZ. If needed, please seek past the record to continue > consumption. > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1125) > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > (...) > Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size > is less than the minimum record overhead (14){code} > or: > {code:java} > java.lang.IllegalStateException: Unexpected error code 2 while fetching data > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:936) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:485) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > (...){code} > 1. Could you consider throwing CorruptRecordException from > parseCompletedFetch() when error == Errors.CORRUPT_MESSAGE? > 2. Seeking past the corrupt record means losing data. I've noticed that the > record is often correct on a follower ISR, and manual change of the partition > leader to the follower node solves the issue in case partition is used by a > single consumer group. Couldn't Kafka server discover such situations and > recover corrupt records from logs available on other ISRs somehow? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7525) Handling corrupt records
[ https://issues.apache.org/jira/browse/KAFKA-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658747#comment-16658747 ] Stanislav Kozlovski commented on KAFKA-7525: Hi [~Solnica] , thanks for the report! Regarding 1. - there has been some work that is ongoing which changes what errors are thrown in the case where message corruption has been detected. The issue we currently have is that we don't provide an easy way to seek past the corrupt records itself. Here is the [KIP-344|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=87297793] > Handling corrupt records > > > Key: KAFKA-7525 > URL: https://issues.apache.org/jira/browse/KAFKA-7525 > Project: Kafka > Issue Type: Improvement > Components: consumer, core >Affects Versions: 1.1.0 >Reporter: Katarzyna Solnica >Priority: Major > > When Java consumer encounters a corrupt record on a partition it reads from, > it throws: > {code:java} > org.apache.kafka.common.KafkaException: Received exception when fetching the > next record from XYZ. If needed, please seek past the record to continue > consumption. > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1125) > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > (...) > Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size > is less than the minimum record overhead (14){code} > or: > {code:java} > java.lang.IllegalStateException: Unexpected error code 2 while fetching data > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:936) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:485) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > (...){code} > 1. Could you consider throwing CorruptRecordException from > parseCompletedFetch() when error == Errors.CORRUPT_MESSAGE? > 2. Seeking past the corrupt record means losing data. I've noticed that the > record is often correct on a follower ISR, and manual change of the partition > leader to the follower node solves the issue in case partition is used by a > single consumer group. Couldn't Kafka server discover such situations and > recover corrupt records from logs available on other ISRs somehow? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7501) double deallocation of producer batch upon expiration of inflight requests and error response
[ https://issues.apache.org/jira/browse/KAFKA-7501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-7501: --- Fix Version/s: (was: 2.0.1) > double deallocation of producer batch upon expiration of inflight requests > and error response > - > > Key: KAFKA-7501 > URL: https://issues.apache.org/jira/browse/KAFKA-7501 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Critical > Fix For: 2.1.0 > > > The following event sequence will lead to double deallocation of a producer > batch. > 1) a producer batch is sent and the response is not received. > 2) the inflight producer batch is expired when deliveryTimeoutMs has reached. > The sender fail the producer batch via "failBatch" and the producer batch > is deallocated via "accumulator.deallocate(batch)". > 3) the response for the batch finally arrived after batch expiration, and the > response contains the error "Errors.MESSAGE_TOO_LARGE" . > 4) the producer batch is split and the original batch is deallocated a second > time. As a result, the "IllegalStateException" will be raised. -- This message was sent by Atlassian JIRA (v7.6.3#76005)