[jira] [Comment Edited] (KAFKA-6690) Priorities for Source Topics

2018-10-22 Thread Bala Prassanna I (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659806#comment-16659806
 ] 

Bala Prassanna I edited comment on KAFKA-6690 at 10/23/18 3:31 AM:
---

[~nafshartous] Sorry about the delay in reply. We use Kafka to process the 
asynchronous events of our Document Management System such as preview 
generation, indexing for search etc. The traffic gets generated via Web and 
Desktop Sync application. In such cases, we had to prioritize the traffic from 
web and consume them first. But this might lead to the starvation of events 
from sync if the consumer speed is slow and the event rate is high from web. A 
solution to handle the starvation with a timeout after which the events are 
consumed normally for a specified period of time would be great and help us use 
our resources effectively.


was (Author: balaprassanna):
[~nafshartous]  Sorry about the delay in reply. We use Kafka to process the 
aynchronous events of our Document Management System such as preview 
generation, indexing for search etc. The traffic gets generated via Web and 
Desktop Sync application. In such cases, we had to prioritise the traffic from 
web and consume them first. Though this may lead to starvation of events from 
sync, an API as such will help us effectively manage the available resources 
and prioritise our traffic without allocating separate server to handle the 
sync load.

> Priorities for Source Topics
> 
>
> Key: KAFKA-6690
> URL: https://issues.apache.org/jira/browse/KAFKA-6690
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Bala Prassanna I
>Assignee: Nick Afshartous
>Priority: Major
>
> We often encounter use cases where we need to prioritise source topics. If a 
> consumer is listening more than one topic, say, HighPriorityTopic and 
> LowPriorityTopic, it should consume events from LowPriorityTopic only when 
> all the events from HighPriorityTopic are consumed. This is needed in Kafka 
> Streams processor topologies as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-10-22 Thread Ewen Cheslack-Postava (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659935#comment-16659935
 ] 

Ewen Cheslack-Postava commented on KAFKA-7481:
--

[~ijuma] As it stands today, is the only use case for 
inter.broker.protocol.version to hold back the version during rolling upgrade? 
Do we even really need this with KIP-35? It sounds like aside from the two 
rolling bounce upgrade, you expect people wouldn't generally be using this 
today (given you say the benefit is minimal)?

I'm fine overloading one of them if we think the existing use case for it is 
either ok to compromise with additional restrictions OR tradeoffs aren't weird 
like they are for log.message.format.version. Test surface area is a fair 
point, wrt complexity I guess I don't see that much of a difference for users 
since they just copy/paste 2 lines rather than 1 and otherwise probably blindly 
follow the directions.

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 2.1.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7299) batch LeaderAndIsr requests during auto preferred leader election

2018-10-22 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-7299:
---
Fix Version/s: 2.0.1
   1.1.2

Kevin, since this is a small patch, I merged it to both 1.1 and 2.0 branch.

> batch LeaderAndIsr requests during auto preferred leader election
> -
>
> Key: KAFKA-7299
> URL: https://issues.apache.org/jira/browse/KAFKA-7299
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Jun Rao
>Assignee: huxihx
>Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Currently, in KafkaController.checkAndTriggerAutoLeaderRebalance(), we call 
> onPreferredReplicaElection() one partition at a time. This means that the 
> controller will be sending LeaderAndIsrRequest one partition at a time. It 
> would be more efficient to call onPreferredReplicaElection() for a batch of 
> partitions to reduce the number of LeaderAndIsrRequests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-10-22 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659857#comment-16659857
 ] 

Matthias J. Sax commented on KAFKA-7510:


Thanks [~MrKafka]. Works for me. If you are quick, we can try to get this into 
2.0.1 bug fix release.

> KStreams RecordCollectorImpl leaks data to logs on error
> 
>
> Key: KAFKA-7510
> URL: https://issues.apache.org/jira/browse/KAFKA-7510
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mr Kafka
>Priority: Major
>  Labels: user-experience
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data 
> on error as it dumps the *value* / message payload to the logs.
> This is problematic as it may contain personally identifiable information 
> (pii) or other secret information to plain text log files which can then be 
> propagated to other log systems i.e Splunk.
> I suggest the *key*, and *value* fields be moved to debug level as it is 
> useful for some people while error level contains the *errorMessage, 
> timestamp, topic* and *stackTrace*.
> {code:java}
> private  void recordSendError(
> final K key,
> final V value,
> final Long timestamp,
> final String topic,
> final Exception exception
> ) {
> String errorLogMessage = LOG_MESSAGE;
> String errorMessage = EXCEPTION_MESSAGE;
> if (exception instanceof RetriableException) {
> errorLogMessage += PARAMETER_HINT;
> errorMessage += PARAMETER_HINT;
> }
> log.error(errorLogMessage, key, value, timestamp, topic, 
> exception.toString());
> sendException = new StreamsException(
> String.format(
> errorMessage,
> logPrefix,
> "an error caught",
> key,
> value,
> timestamp,
> topic,
> exception.toString()
> ),
> exception);
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config

2018-10-22 Thread Sophie Qian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659824#comment-16659824
 ] 

Sophie Qian edited comment on KAFKA-7530 at 10/22/18 11:41 PM:
---

Our problem is with runnning Confluent Schema Registry  in docker container.

I got  following warning msg after export 
{color:#14892c}SCHEMA_REGISTRY_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG="" 
{color}

{color:#d04437}[2018-10-22 17:58:10,058] WARN The configuration 
'ssl.endpoint.identification.algorithm.config' was supplied but isn't a known 
config. (org.apache.kafka.clients.admin.AdminClientConfig){color}

 

{color:#d04437}{color:#33}The value for  
ssl.endpoint.identification.algorithm.config  in  
/etc/schema-registry/schema-registry.properties is not been taken into 
consideration.   it's empty string in the properties file{color} 
{color:#14892c}( 
{color}{color}{color:#14892c}ssl.endpoint.identification.algorithm.config= 
){color}

 

Schema registry log   showing  ssl.endpoint.identification.algorithm is using 
"https"

[2018-10-22 23:26:43,039] INFO AdminClientConfig values:
 ..
 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 {color:#d04437}ssl.endpoint.identification.algorithm = https{color}



{color:#d04437} {color}

 


was (Author: kafka):
Our problem is with runnning Confluent Schema Registry  in docker container.

I got  following warning msg after export 
{color:#14892c}SCHEMA_REGISTRY_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG="" 
{color}

{color:#d04437}[2018-10-22 17:58:10,058] WARN The configuration 
'ssl.endpoint.identification.algorithm.config' was supplied but isn't a known 
config. (org.apache.kafka.clients.admin.AdminClientConfig){color}

 

{color:#d04437}{color:#33}The value for  
ssl.endpoint.identification.algorithm.config  in  
/etc/schema-registry/schema-registry.properties is not been taken into 
consideration.   it's empty string in the properties file ( 
{color}{color}{color:#14892c}ssl.endpoint.identification.algorithm.config={color}
 )

 

Schema registry log   showing  ssl.endpoint.identification.algorithm is using 
"https"

[2018-10-22 23:26:43,039] INFO AdminClientConfig values:
..
 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 {color:#d04437}ssl.endpoint.identification.algorithm = https{color}



{color:#d04437} {color}

 

> Need to allow overwrite ssl.endpoint.identification.algorithm.config
> 
>
> Key: KAFKA-7530
> URL: https://issues.apache.org/jira/browse/KAFKA-7530
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: Sophie Qian
>Priority: Major
>
> We are in the process of upgrading our system to use Confluent 5.0.0 (which 
> is using Kafka 2.0.0). I found out SslConfigs ( 
> clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java)  has 
> following change:
>  
> {color:#14892c}KAFKA-3665: Enable TLS hostname verification by default 
> (KIP-294) (#4956){color}
> {color:#14892c}Make HTTPS the default 
> ssl.endpoint.identification.algorithm.{color}
>  
> But  user can not overwrite ssl.endpoint.identification.alogorithm, only 
> following values can be reconfigurable.
> {color:#205081}public static final Set RECONFIGURABLE_CONFIGS = 
> Utils.mkSet(
> ¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
> ¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
> ¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
> ¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color}
>  
> Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG 
> reconfigurable. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config

2018-10-22 Thread Sophie Qian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659824#comment-16659824
 ] 

Sophie Qian commented on KAFKA-7530:


Our problem is with runnning Confluent Schema Registry  in docker container.

I got  following warning msg after export 
{color:#14892c}SCHEMA_REGISTRY_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG="" 
{color}

{color:#d04437}[2018-10-22 17:58:10,058] WARN The configuration 
'ssl.endpoint.identification.algorithm.config' was supplied but isn't a known 
config. (org.apache.kafka.clients.admin.AdminClientConfig){color}

 

{color:#d04437}{color:#33}The value for  
ssl.endpoint.identification.algorithm.config  in  
/etc/schema-registry/schema-registry.properties is not been taken into 
consideration.   it's empty string in the properties file ( 
{color}{color}{color:#14892c}ssl.endpoint.identification.algorithm.config={color}
 )

 

Schema registry log   showing  ssl.endpoint.identification.algorithm is using 
"https"

[2018-10-22 23:26:43,039] INFO AdminClientConfig values:
..
 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 {color:#d04437}ssl.endpoint.identification.algorithm = https{color}



{color:#d04437} {color}

 

> Need to allow overwrite ssl.endpoint.identification.algorithm.config
> 
>
> Key: KAFKA-7530
> URL: https://issues.apache.org/jira/browse/KAFKA-7530
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: Sophie Qian
>Priority: Major
>
> We are in the process of upgrading our system to use Confluent 5.0.0 (which 
> is using Kafka 2.0.0). I found out SslConfigs ( 
> clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java)  has 
> following change:
>  
> {color:#14892c}KAFKA-3665: Enable TLS hostname verification by default 
> (KIP-294) (#4956){color}
> {color:#14892c}Make HTTPS the default 
> ssl.endpoint.identification.algorithm.{color}
>  
> But  user can not overwrite ssl.endpoint.identification.alogorithm, only 
> following values can be reconfigurable.
> {color:#205081}public static final Set RECONFIGURABLE_CONFIGS = 
> Utils.mkSet(
> ¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
> ¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
> ¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
> ¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color}
>  
> Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG 
> reconfigurable. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6690) Priorities for Source Topics

2018-10-22 Thread Bala Prassanna I (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659806#comment-16659806
 ] 

Bala Prassanna I commented on KAFKA-6690:
-

[~nafshartous]  Sorry about the delay in reply. We use Kafka to process the 
aynchronous events of our Document Management System such as preview 
generation, indexing for search etc. The traffic gets generated via Web and 
Desktop Sync application. In such cases, we had to prioritise the traffic from 
web and consume them first. Though this may lead to starvation of events from 
sync, an API as such will help us effectively manage the available resources 
and prioritise our traffic without allocating separate server to handle the 
sync load.

> Priorities for Source Topics
> 
>
> Key: KAFKA-6690
> URL: https://issues.apache.org/jira/browse/KAFKA-6690
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Bala Prassanna I
>Assignee: Nick Afshartous
>Priority: Major
>
> We often encounter use cases where we need to prioritise source topics. If a 
> consumer is listening more than one topic, say, HighPriorityTopic and 
> LowPriorityTopic, it should consume events from LowPriorityTopic only when 
> all the events from HighPriorityTopic are consumed. This is needed in Kafka 
> Streams processor topologies as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config

2018-10-22 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659778#comment-16659778
 ] 

Ismael Juma commented on KAFKA-7530:


Via server.properties.

> Need to allow overwrite ssl.endpoint.identification.algorithm.config
> 
>
> Key: KAFKA-7530
> URL: https://issues.apache.org/jira/browse/KAFKA-7530
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: Sophie Qian
>Priority: Major
>
> We are in the process of upgrading our system to use Confluent 5.0.0 (which 
> is using Kafka 2.0.0). I found out SslConfigs ( 
> clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java)  has 
> following change:
>  
> {color:#14892c}KAFKA-3665: Enable TLS hostname verification by default 
> (KIP-294) (#4956){color}
> {color:#14892c}Make HTTPS the default 
> ssl.endpoint.identification.algorithm.{color}
>  
> But  user can not overwrite ssl.endpoint.identification.alogorithm, only 
> following values can be reconfigurable.
> {color:#205081}public static final Set RECONFIGURABLE_CONFIGS = 
> Utils.mkSet(
> ¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
> ¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
> ¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
> ¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color}
>  
> Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG 
> reconfigurable. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config

2018-10-22 Thread Sophie Qian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659777#comment-16659777
 ] 

Sophie Qian commented on KAFKA-7530:


[~ijuma]  How can we override this config ?   Can you provide  some details? 
Thanks 

> Need to allow overwrite ssl.endpoint.identification.algorithm.config
> 
>
> Key: KAFKA-7530
> URL: https://issues.apache.org/jira/browse/KAFKA-7530
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: Sophie Qian
>Priority: Major
>
> We are in the process of upgrading our system to use Confluent 5.0.0 (which 
> is using Kafka 2.0.0). I found out SslConfigs ( 
> clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java)  has 
> following change:
>  
> {color:#14892c}KAFKA-3665: Enable TLS hostname verification by default 
> (KIP-294) (#4956){color}
> {color:#14892c}Make HTTPS the default 
> ssl.endpoint.identification.algorithm.{color}
>  
> But  user can not overwrite ssl.endpoint.identification.alogorithm, only 
> following values can be reconfigurable.
> {color:#205081}public static final Set RECONFIGURABLE_CONFIGS = 
> Utils.mkSet(
> ¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
> ¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
> ¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
> ¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color}
>  
> Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG 
> reconfigurable. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config

2018-10-22 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659772#comment-16659772
 ] 

Ismael Juma commented on KAFKA-7530:


To be clear, you can override the config, you just can't do it dynamically.

> Need to allow overwrite ssl.endpoint.identification.algorithm.config
> 
>
> Key: KAFKA-7530
> URL: https://issues.apache.org/jira/browse/KAFKA-7530
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: Sophie Qian
>Priority: Major
>
> We are in the process of upgrading our system to use Confluent 5.0.0 (which 
> is using Kafka 2.0.0). I found out SslConfigs ( 
> clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java)  has 
> following change:
>  
> {color:#14892c}KAFKA-3665: Enable TLS hostname verification by default 
> (KIP-294) (#4956){color}
> {color:#14892c}Make HTTPS the default 
> ssl.endpoint.identification.algorithm.{color}
>  
> But  user can not overwrite ssl.endpoint.identification.alogorithm, only 
> following values can be reconfigurable.
> {color:#205081}public static final Set RECONFIGURABLE_CONFIGS = 
> Utils.mkSet(
> ¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
> ¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
> ¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
> ¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color}
>  
> Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG 
> reconfigurable. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config

2018-10-22 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-7530:
--

Assignee: Ismael Juma

> Need to allow overwrite ssl.endpoint.identification.algorithm.config
> 
>
> Key: KAFKA-7530
> URL: https://issues.apache.org/jira/browse/KAFKA-7530
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: Sophie Qian
>Assignee: Ismael Juma
>Priority: Major
>
> We are in the process of upgrading our system to use Confluent 5.0.0 (which 
> is using Kafka 2.0.0). I found out SslConfigs ( 
> clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java)  has 
> following change:
>  
> {color:#14892c}KAFKA-3665: Enable TLS hostname verification by default 
> (KIP-294) (#4956){color}
> {color:#14892c}Make HTTPS the default 
> ssl.endpoint.identification.algorithm.{color}
>  
> But  user can not overwrite ssl.endpoint.identification.alogorithm, only 
> following values can be reconfigurable.
> {color:#205081}public static final Set RECONFIGURABLE_CONFIGS = 
> Utils.mkSet(
> ¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
> ¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
> ¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
> ¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color}
>  
> Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG 
> reconfigurable. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config

2018-10-22 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-7530:
--

Assignee: (was: Ismael Juma)

> Need to allow overwrite ssl.endpoint.identification.algorithm.config
> 
>
> Key: KAFKA-7530
> URL: https://issues.apache.org/jira/browse/KAFKA-7530
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: Sophie Qian
>Priority: Major
>
> We are in the process of upgrading our system to use Confluent 5.0.0 (which 
> is using Kafka 2.0.0). I found out SslConfigs ( 
> clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java)  has 
> following change:
>  
> {color:#14892c}KAFKA-3665: Enable TLS hostname verification by default 
> (KIP-294) (#4956){color}
> {color:#14892c}Make HTTPS the default 
> ssl.endpoint.identification.algorithm.{color}
>  
> But  user can not overwrite ssl.endpoint.identification.alogorithm, only 
> following values can be reconfigurable.
> {color:#205081}public static final Set RECONFIGURABLE_CONFIGS = 
> Utils.mkSet(
> ¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
> ¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
> ¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
> ¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
> ¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color}
>  
> Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG 
> reconfigurable. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-10-22 Thread Mr Kafka (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659769#comment-16659769
 ] 

Mr Kafka edited comment on KAFKA-7510 at 10/22/18 10:33 PM:


[~mjsax]
{quote}Why does it not contradict KAFKA-6538? It is about adding key/value in 
human readable form to exception messages that end up in the logs. While this 
ticket is about removing key/value data from the logs. What do you mean by "it 
only affects it's implementation"?
{quote}
You can still add key/vaue/headers or as much useful data as you want to the 
log messages, those log messages will be under TRACE level and a user has to 
explicitly turn TRACE level on to see the extra information.

Happy to create a PR to move out key/value logging from RecordCollectorImpl to 
trace as it directly blocks me and I'm likely to do this in a private fork 
regardless until it is in an official release. Not so interested on creating a 
KIP and long discussions on moving sensitive log data to a different log level 
which is as simple as adding a *log.trace(...)* below the *log.error(...)* 
statements.


was (Author: mrkafka):
[~mjsax]

{quote}

Why does it not contradict KAFKA-6538? It is about adding key/value in human 
readable form to exception messages that end up in the logs. While this ticket 
is about removing key/value data from the logs. What do you mean by "it only 
affects it's implementation"?

{quote}

You can still add key/vaue/headers or as much useful data as you want to the 
log messages, those log messages will be under TRACE level and a user has to 
explicitly turn TRACE level on to see the extra information.

Happy to create a PR to move out key/value logging from RecordCollectorImpl to 
trace as it directly blocks me and I'm likely to do this in a private fork 
regardless until it is done. Not so interested on creating a KIP and long 
discussions on moving sensitive log data to a different log level which is as 
simple as adding a *log.trace(...)* below the *log.error(...)* statements.

> KStreams RecordCollectorImpl leaks data to logs on error
> 
>
> Key: KAFKA-7510
> URL: https://issues.apache.org/jira/browse/KAFKA-7510
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mr Kafka
>Priority: Major
>  Labels: user-experience
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data 
> on error as it dumps the *value* / message payload to the logs.
> This is problematic as it may contain personally identifiable information 
> (pii) or other secret information to plain text log files which can then be 
> propagated to other log systems i.e Splunk.
> I suggest the *key*, and *value* fields be moved to debug level as it is 
> useful for some people while error level contains the *errorMessage, 
> timestamp, topic* and *stackTrace*.
> {code:java}
> private  void recordSendError(
> final K key,
> final V value,
> final Long timestamp,
> final String topic,
> final Exception exception
> ) {
> String errorLogMessage = LOG_MESSAGE;
> String errorMessage = EXCEPTION_MESSAGE;
> if (exception instanceof RetriableException) {
> errorLogMessage += PARAMETER_HINT;
> errorMessage += PARAMETER_HINT;
> }
> log.error(errorLogMessage, key, value, timestamp, topic, 
> exception.toString());
> sendException = new StreamsException(
> String.format(
> errorMessage,
> logPrefix,
> "an error caught",
> key,
> value,
> timestamp,
> topic,
> exception.toString()
> ),
> exception);
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-10-22 Thread Mr Kafka (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659769#comment-16659769
 ] 

Mr Kafka commented on KAFKA-7510:
-

[~mjsax]

{quote}

Why does it not contradict KAFKA-6538? It is about adding key/value in human 
readable form to exception messages that end up in the logs. While this ticket 
is about removing key/value data from the logs. What do you mean by "it only 
affects it's implementation"?

{quote}

You can still add key/vaue/headers or as much useful data as you want to the 
log messages, those log messages will be under TRACE level and a user has to 
explicitly turn TRACE level on to see the extra information.

Happy to create a PR to move out key/value logging from RecordCollectorImpl to 
trace as it directly blocks me and I'm likely to do this in a private fork 
regardless until it is done. Not so interested on creating a KIP and long 
discussions on moving sensitive log data to a different log level which is as 
simple as adding a *log.trace(...)* below the *log.error(...)* statements.

> KStreams RecordCollectorImpl leaks data to logs on error
> 
>
> Key: KAFKA-7510
> URL: https://issues.apache.org/jira/browse/KAFKA-7510
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mr Kafka
>Priority: Major
>  Labels: user-experience
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data 
> on error as it dumps the *value* / message payload to the logs.
> This is problematic as it may contain personally identifiable information 
> (pii) or other secret information to plain text log files which can then be 
> propagated to other log systems i.e Splunk.
> I suggest the *key*, and *value* fields be moved to debug level as it is 
> useful for some people while error level contains the *errorMessage, 
> timestamp, topic* and *stackTrace*.
> {code:java}
> private  void recordSendError(
> final K key,
> final V value,
> final Long timestamp,
> final String topic,
> final Exception exception
> ) {
> String errorLogMessage = LOG_MESSAGE;
> String errorMessage = EXCEPTION_MESSAGE;
> if (exception instanceof RetriableException) {
> errorLogMessage += PARAMETER_HINT;
> errorMessage += PARAMETER_HINT;
> }
> log.error(errorLogMessage, key, value, timestamp, topic, 
> exception.toString());
> sendException = new StreamsException(
> String.format(
> errorMessage,
> logPrefix,
> "an error caught",
> key,
> value,
> timestamp,
> topic,
> exception.toString()
> ),
> exception);
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7530) Need to allow overwrite ssl.endpoint.identification.algorithm.config

2018-10-22 Thread Sophie Qian (JIRA)
Sophie Qian created KAFKA-7530:
--

 Summary: Need to allow overwrite 
ssl.endpoint.identification.algorithm.config
 Key: KAFKA-7530
 URL: https://issues.apache.org/jira/browse/KAFKA-7530
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.0.0
Reporter: Sophie Qian


We are in the process of upgrading our system to use Confluent 5.0.0 (which is 
using Kafka 2.0.0). I found out SslConfigs ( 
clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java)  has 
following change:

 

{color:#14892c}KAFKA-3665: Enable TLS hostname verification by default 
(KIP-294) (#4956){color}

{color:#14892c}Make HTTPS the default 
ssl.endpoint.identification.algorithm.{color}

 

But  user can not overwrite ssl.endpoint.identification.alogorithm, only 
following values can be reconfigurable.

{color:#205081}public static final Set RECONFIGURABLE_CONFIGS = 
Utils.mkSet(
¦ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
¦ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
¦ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
¦ SslConfigs.SSL_KEY_PASSWORD_CONFIG,
¦ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
¦ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
¦ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);{color}

 

Pls make SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG 
reconfigurable. 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6690) Priorities for Source Topics

2018-10-22 Thread Nick Afshartous (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659562#comment-16659562
 ] 

Nick Afshartous commented on KAFKA-6690:


[~balaprassanna] Just checking again if you can help with elaborating on your 
use-cases ?  This would make it easier to facilitate a vote on this feature 
request.  

> Priorities for Source Topics
> 
>
> Key: KAFKA-6690
> URL: https://issues.apache.org/jira/browse/KAFKA-6690
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Bala Prassanna I
>Assignee: Nick Afshartous
>Priority: Major
>
> We often encounter use cases where we need to prioritise source topics. If a 
> consumer is listening more than one topic, say, HighPriorityTopic and 
> LowPriorityTopic, it should consume events from LowPriorityTopic only when 
> all the events from HighPriorityTopic are consumed. This is needed in Kafka 
> Streams processor topologies as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-10-22 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659432#comment-16659432
 ] 

Matthias J. Sax commented on KAFKA-7510:


Why does it not contradict KAFKA-6538? It is about adding key/value in human 
readable form to exception messages that end up in the logs. While this ticket 
is about removing key/value data from the logs. What do you mean by "it only 
affects it's implementation"?

For all the other points: I think discussion this on the ticket is not the 
right place. Would you like to drive a discussion [~MrKafka]? If yes, please 
send an "[DISCUSS]" email to dev-mailing list or maybe propose a KIP (cf. 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals)?|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals)]
 The goal should be to agree on a global strategy on how to handle this, and 
this should be documented in the Wiki IMHO to have a reference. For concrete 
violations of the strategy (eg, in RecordCollectorImpl) we can create 
individual ticket like this one to close the gaps.

Thoughts?

> KStreams RecordCollectorImpl leaks data to logs on error
> 
>
> Key: KAFKA-7510
> URL: https://issues.apache.org/jira/browse/KAFKA-7510
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mr Kafka
>Priority: Major
>  Labels: user-experience
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data 
> on error as it dumps the *value* / message payload to the logs.
> This is problematic as it may contain personally identifiable information 
> (pii) or other secret information to plain text log files which can then be 
> propagated to other log systems i.e Splunk.
> I suggest the *key*, and *value* fields be moved to debug level as it is 
> useful for some people while error level contains the *errorMessage, 
> timestamp, topic* and *stackTrace*.
> {code:java}
> private  void recordSendError(
> final K key,
> final V value,
> final Long timestamp,
> final String topic,
> final Exception exception
> ) {
> String errorLogMessage = LOG_MESSAGE;
> String errorMessage = EXCEPTION_MESSAGE;
> if (exception instanceof RetriableException) {
> errorLogMessage += PARAMETER_HINT;
> errorMessage += PARAMETER_HINT;
> }
> log.error(errorLogMessage, key, value, timestamp, topic, 
> exception.toString());
> sendException = new StreamsException(
> String.format(
> errorMessage,
> logPrefix,
> "an error caught",
> key,
> value,
> timestamp,
> topic,
> exception.toString()
> ),
> exception);
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659332#comment-16659332
 ] 

ASF GitHub Bot commented on KAFKA-7519:
---

ijuma closed pull request #5820: KAFKA-7519 Clear pending transaction state 
when expiration fails
URL: https://github.com/apache/kafka/pull/5820
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 574c64e2c65..589407c2a2c 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -166,37 +166,32 @@ class TransactionStateManager(brokerId: Int,
 (topicPartition, records)
   }
 
-
 def removeFromCacheCallback(responses: collection.Map[TopicPartition, 
PartitionResponse]): Unit = {
   responses.foreach { case (topicPartition, response) =>
-response.error match {
-  case Errors.NONE =>
-inReadLock(stateLock) {
-  val toRemove = 
transactionalIdByPartition(topicPartition.partition())
-  transactionMetadataCache.get(topicPartition.partition)
-.foreach { txnMetadataCacheEntry =>
-  toRemove.foreach { idCoordinatorEpochAndMetadata =>
-val txnMetadata = 
txnMetadataCacheEntry.metadataPerTransactionalId.get(idCoordinatorEpochAndMetadata.transactionalId)
-txnMetadata.inLock {
-  if (txnMetadataCacheEntry.coordinatorEpoch == 
idCoordinatorEpochAndMetadata.coordinatorEpoch
-&& txnMetadata.pendingState.contains(Dead)
-&& txnMetadata.producerEpoch == 
idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
-  )
-
txnMetadataCacheEntry.metadataPerTransactionalId.remove(idCoordinatorEpochAndMetadata.transactionalId)
-  else {
- debug(s"failed to remove expired transactionalId: 
${idCoordinatorEpochAndMetadata.transactionalId}" +
-   s" from cache. pendingState: 
${txnMetadata.pendingState} producerEpoch: ${txnMetadata.producerEpoch}" +
-   s" expected producerEpoch: 
${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}" +
-   s" coordinatorEpoch: 
${txnMetadataCacheEntry.coordinatorEpoch} expected coordinatorEpoch: " +
-   
s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
-txnMetadata.pendingState = None
-  }
-}
-  }
+inReadLock(stateLock) {
+  val toRemove = 
transactionalIdByPartition(topicPartition.partition)
+  transactionMetadataCache.get(topicPartition.partition).foreach { 
txnMetadataCacheEntry =>
+toRemove.foreach { idCoordinatorEpochAndMetadata =>
+  val transactionalId = 
idCoordinatorEpochAndMetadata.transactionalId
+  val txnMetadata = 
txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
+  txnMetadata.inLock {
+if (txnMetadataCacheEntry.coordinatorEpoch == 
idCoordinatorEpochAndMetadata.coordinatorEpoch
+  && txnMetadata.pendingState.contains(Dead)
+  && txnMetadata.producerEpoch == 
idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
+  && response.error == Errors.NONE) {
+  
txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
+} else {
+  warn(s"Failed to remove expired transactionalId: 
$transactionalId" +
+s" from cache. Tombstone append error code: 
${response.error}," +
+s" pendingState: ${txnMetadata.pendingState}, 
producerEpoch: ${txnMetadata.producerEpoch}," +
+s" expected producerEpoch: 
${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
+s" coordinatorEpoch: 
${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
+s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
+  txnMetadata.pendingState = None
 }
+  }
 }
-  case _ =>
-  

[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-10-22 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659227#comment-16659227
 ] 

Ismael Juma commented on KAFKA-7481:


The key insight for me is that the the benefit of having a separate config for 
the stateless parts is minimal and increasing the testing surface area for both 
upgrades and downgrades is not desirable (in addition to the addition 
complexity for the user).

 

Yes, these things should ideally not be done in minor releases, but we are 
where we are.

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 2.1.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7529) Kafka Connect JDBC doesn't push new records to Kafka Topic unless the connector is restarted

2018-10-22 Thread Kashyap Ivaturi (JIRA)
Kashyap Ivaturi created KAFKA-7529:
--

 Summary: Kafka Connect JDBC doesn't push new records to Kafka 
Topic unless the connector is restarted
 Key: KAFKA-7529
 URL: https://issues.apache.org/jira/browse/KAFKA-7529
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Kashyap Ivaturi


Hi,

We have a Kafka Connect JDBC Source Connector which keeps polling for new 
records in a Oracle table every minute and push the new records to Kafka Topic. 
New records are determined by an incrementing column.

In general everything works well but once in a while we see that even though 
there were new records with incrementing column those records doesn't get 
pushed to the Topic. There is no clue of any error in the logs and the 
connector is in running state. Only after we restart the Connector the new 
records are pushed to the Topic. 

Any idea in what situation can this happen?. 

Rgds
Kashyap.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7528) Make Min and Max metrics' default value consistent with each other

2018-10-22 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-7528:
--

 Summary: Make Min and Max metrics' default value consistent with 
each other
 Key: KAFKA-7528
 URL: https://issues.apache.org/jira/browse/KAFKA-7528
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski
 Fix For: 2.2.0


KIP-386: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-22 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7519:
---
Fix Version/s: 2.0.1

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Priority: Blocker
> Fix For: 2.0.1, 2.1.0
>
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
> ansactionStateManager.scala:142)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a
> 

[jira] [Commented] (KAFKA-7501) double deallocation of producer batch upon expiration of inflight requests and error response

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658791#comment-16658791
 ] 

ASF GitHub Bot commented on KAFKA-7501:
---

ijuma closed pull request #5807: KAFKA-7501: fix producer batch double 
deallocation when receiving message too large error on expired batch
URL: https://github.com/apache/kafka/pull/5807
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 6e08185611d..0adbbf962cd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -72,9 +72,8 @@
 private long lastAttemptMs;
 private long lastAppendTime;
 private long drainedMs;
-private String expiryErrorMessage;
 private boolean retry;
-private boolean reopened = false;
+private boolean reopened;
 
 public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder 
recordsBuilder, long createdMs) {
 this(tp, recordsBuilder, createdMs, false);
@@ -156,6 +155,13 @@ public void abort(RuntimeException exception) {
 completeFutureAndFireCallbacks(ProduceResponse.INVALID_OFFSET, 
RecordBatch.NO_TIMESTAMP, exception);
 }
 
+/**
+ * Return `true` if {@link #done(long, long, RuntimeException)} has been 
invoked at least once, `false` otherwise.
+ */
+public boolean isDone() {
+return finalState() != null;
+}
+
 /**
  * Finalize the state of a batch. Final state, once set, is immutable. 
This function may be called
  * once or twice on a batch. It may be called twice if
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index c50a85f06da..19d7af2e7a0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -184,9 +184,9 @@ public void maybeRemoveFromInflightBatches(ProducerBatch 
batch) {
 if 
(batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now)) {
 iter.remove();
 // expireBatches is called in Sender.sendProducerData, 
before client.poll.
-// The batch.finalState() == null invariant should 
always hold. An IllegalStateException
+// The !batch.isDone() invariant should always hold. 
An IllegalStateException
 // exception will be thrown if the invariant is 
violated.
-if (batch.finalState() == null) {
+if (!batch.isDone()) {
 expiredBatches.add(batch);
 } else {
 throw new 
IllegalStateException(batch.topicPartition + " batch created at " +
@@ -576,7 +576,7 @@ private void completeBatch(ProducerBatch batch, 
ProduceResponse.PartitionRespons
long now, long throttleUntilTimeMs) {
 Errors error = response.error;
 
-if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
+if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && 
!batch.isDone() &&
 (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || 
batch.isCompressed())) {
 // If the batch is too large, we split the batch and send the 
split batches again. We do not decrement
 // the retry attempts in this case.
@@ -726,7 +726,7 @@ private void failBatch(ProducerBatch batch, long 
baseOffset, long logAppendTime,
 private boolean canRetry(ProducerBatch batch, 
ProduceResponse.PartitionResponse response, long now) {
 return 
!batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now) &&
 batch.attempts() < this.retries &&
-batch.finalState() == null &&
+!batch.isDone() &&
 ((response.error.exception() instanceof RetriableException) ||
 (transactionManager != null && 
transactionManager.canRetry(response, batch)));
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 23ca2aeaed7..8a8ddd35836 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 

[jira] [Commented] (KAFKA-7525) Handling corrupt records

2018-10-22 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658756#comment-16658756
 ] 

Ismael Juma commented on KAFKA-7525:


That IllegalStateException is a clear bug. Please test with a more recent 
version as it may have been fixed already.

> Handling corrupt records
> 
>
> Key: KAFKA-7525
> URL: https://issues.apache.org/jira/browse/KAFKA-7525
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, core
>Affects Versions: 1.1.0
>Reporter: Katarzyna Solnica
>Priority: Major
>
> When Java consumer encounters a corrupt record on a partition it reads from, 
> it throws:
> {code:java}
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from XYZ. If needed, please seek past the record to continue 
> consumption.
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1125)
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993)
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527)
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>     (...)
> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size 
> is less than the minimum record overhead (14){code}
> or:
> {code:java}
> java.lang.IllegalStateException: Unexpected error code 2 while fetching data
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:936)
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:485)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>     (...){code}
> 1. Could you consider throwing CorruptRecordException from 
> parseCompletedFetch() when error == Errors.CORRUPT_MESSAGE?
> 2. Seeking past the corrupt record means losing data. I've noticed that the 
> record is often correct on a follower ISR, and manual change of the partition 
> leader to the follower node solves the issue in case partition is used by a 
> single consumer group. Couldn't Kafka server discover such situations and 
> recover corrupt records from logs available on other ISRs somehow?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7525) Handling corrupt records

2018-10-22 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658747#comment-16658747
 ] 

Stanislav Kozlovski commented on KAFKA-7525:


 

 

 

Hi [~Solnica] , thanks for the report!

Regarding 1. - there has been some work that is ongoing which changes what 
errors are thrown in the case where message corruption has been detected. The 
issue we currently have is that we don't provide an easy way to seek past the 
corrupt records itself. Here is the 
[KIP-344|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=87297793]

 

> Handling corrupt records
> 
>
> Key: KAFKA-7525
> URL: https://issues.apache.org/jira/browse/KAFKA-7525
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, core
>Affects Versions: 1.1.0
>Reporter: Katarzyna Solnica
>Priority: Major
>
> When Java consumer encounters a corrupt record on a partition it reads from, 
> it throws:
> {code:java}
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from XYZ. If needed, please seek past the record to continue 
> consumption.
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1125)
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993)
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527)
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>     (...)
> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size 
> is less than the minimum record overhead (14){code}
> or:
> {code:java}
> java.lang.IllegalStateException: Unexpected error code 2 while fetching data
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:936)
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:485)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>     (...){code}
> 1. Could you consider throwing CorruptRecordException from 
> parseCompletedFetch() when error == Errors.CORRUPT_MESSAGE?
> 2. Seeking past the corrupt record means losing data. I've noticed that the 
> record is often correct on a follower ISR, and manual change of the partition 
> leader to the follower node solves the issue in case partition is used by a 
> single consumer group. Couldn't Kafka server discover such situations and 
> recover corrupt records from logs available on other ISRs somehow?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7501) double deallocation of producer batch upon expiration of inflight requests and error response

2018-10-22 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7501:
---
Fix Version/s: (was: 2.0.1)

> double deallocation of producer batch upon expiration of inflight requests 
> and error response
> -
>
> Key: KAFKA-7501
> URL: https://issues.apache.org/jira/browse/KAFKA-7501
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Critical
> Fix For: 2.1.0
>
>
> The following event sequence will lead to double deallocation of a producer 
> batch.
> 1) a producer batch is sent and the response is not received. 
> 2) the inflight producer batch is expired when deliveryTimeoutMs has reached. 
>  The  sender fail the producer batch via "failBatch" and the producer batch 
> is deallocated via "accumulator.deallocate(batch)". 
> 3) the response for the batch finally arrived after batch expiration, and the 
> response contains the error "Errors.MESSAGE_TOO_LARGE" .
> 4) the producer batch is split and the original batch is deallocated a second 
> time. As a result, the "IllegalStateException" will be raised. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)