[jira] [Updated] (KAFKA-7710) Poor Zookeeper ACL management with Kerberos

2018-12-05 Thread Mr Kafka (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mr Kafka updated KAFKA-7710:

Description: 
I have seen many organizations run many Kafka clusters. The simplest scenario 
is you may have a *kafka.dev.example.com* cluster and a 
*kafka.prod.example.com* cluster. The more extreme examples is teams with in an 
organization may run their own individual clusters.

When you enable Zookeeper ACLs in Kafka the ACL looks to be set to the 
principal (SPN) that is used to authenticate against Zookeeper.

For example I have brokers:
 * *01.kafka.dev.example.com*
 * *02.kafka.dev.example.com***
 * *03.kafka.dev.example.com***

On *01.kafka.dev.example.com* **I run the below the security-migration tool:
{code:java}
KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf 
-Dzookeeper.sasl.clientconfig=ZkClient" zookeeper-security-migration 
--zookeeper.acl=secure --zookeeper.connect=a01.zookeeper.dev.example.com:2181
{code}
I end up with ACL's in Zookeeper as below:
{code:java}
# [zk: localhost:2181(CONNECTED) 2] getAcl /cluster
# 'sasl,'kafka/01.kafka.dev.example.com@EXAMPLE
# : cdrwa
{code}
This ACL means no other broker in the cluster can access the znode in Zookeeper 
except broker 01.

To resolve the issue you need to set the below properties in Zookeeper's config:
{code:java}
kerberos.removeHostFromPrincipal = true
kerberos.removeRealmFromPrincipal = true
{code}
Now when Kafka set ACL's they are stored as:
{code:java}
# [zk: localhost:2181(CONNECTED) 2] getAcl /cluster
# 'sasl,'kafka
#: cdrwa
{code}
This now means any broker in the cluster can access the ZK node.This means if I 
have a dev Kafka broker it can right to a "prod.zookeeper.example.com" 
zookeeper host as when it auth's based on a SPN 
"kafka/01.kafka.dev.example.com" the host is dropped and we auth against the 
service principal kafka.

If your organization is flexible you may be able to create different Kerberos 
Realms per cluster and use:
{code:java}
kerberos.removeHostFromPrincipal = true
kerberos.removeRealmFromPrincipal = false{code}
That means acl's will be in the format "kafka/REALM" which means only brokers 
in the same realm can connect. The difficulty here is your average large 
organization security team willing to create adhoc realms.

*Proposal*

Kafka support setting ACLs for all known brokers in the cluster i.e ACLs on a 
Znode have
{code:java}
kafka/01.kafka.dev.example.com@EXAMPLE
kafka/02.kafka.dev.example.com@EXAMPLE
kafka/03.kafka.dev.example.com@EXAMPLE{code}
With this though some kind of support will need to be added so if a new broker 
is added to a cluster the host ACL gets added to existing ZNodes.

  was:
I have seen many organizations run many Kafka clusters. The simplest scenario 
is you may have a *kafka.dev.example.com* cluster and a 
*kafka.prod.example.com* cluster. The more extreme examples is teams with in an 
organization may run their own individual clusters.

 

When you enable Zookeeper ACLs in Kafka the ACL looks to be set to the 
principal (SPN) that is used to authenticate against Zookeeper.

For example I have brokers:
 * *01.kafka.dev.example.com*
 * *02.kafka.dev.example.com***
 * *03.kafka.dev.example.com***

On *01.kafka.dev.example.com* **I run the below the security-migration tool:
{code:java}
KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf 
-Dzookeeper.sasl.clientconfig=ZkClient" zookeeper-security-migration 
--zookeeper.acl=secure --zookeeper.connect=a01.zookeeper.dev.example.com:2181
{code}
I end up with ACL's in Zookeeper as below:
{code:java}
# [zk: localhost:2181(CONNECTED) 2] getAcl /cluster
# 'sasl,'kafka/01.kafka.dev.example.com@EXAMPLE
# : cdrwa
{code}
This ACL means no other broker in the cluster can access the znode in Zookeeper 
except broker 01.

To resolve the issue you need to set the below properties in Zookeeper's config:
{code:java}
kerberos.removeHostFromPrincipal = true
kerberos.removeRealmFromPrincipal = true
{code}
Now when Kafka set ACL's they are stored as:

 


> Poor Zookeeper ACL management with Kerberos
> ---
>
> Key: KAFKA-7710
> URL: https://issues.apache.org/jira/browse/KAFKA-7710
> Project: Kafka
>  Issue Type: Bug
>Reporter: Mr Kafka
>Priority: Major
>
> I have seen many organizations run many Kafka clusters. The simplest scenario 
> is you may have a *kafka.dev.example.com* cluster and a 
> *kafka.prod.example.com* cluster. The more extreme examples is teams with in 
> an organization may run their own individual clusters.
> When you enable Zookeeper ACLs in Kafka the ACL looks to be set to the 
> principal (SPN) that is used to authenticate against Zookeeper.
> For example I have brokers:
>  * *01.kafka.dev.example.com*
>  * *02.kafka.dev.example.com***
>  * *03.kafka.dev.example.com***
> 

[jira] [Updated] (KAFKA-7710) Poor Zookeeper ACL management with Kerberos

2018-12-05 Thread Mr Kafka (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mr Kafka updated KAFKA-7710:

Description: 
I have seen many organizations run many Kafka clusters. The simplest scenario 
is you may have a *kafka.dev.example.com* cluster and a 
*kafka.prod.example.com* cluster. The more extreme examples is teams within an 
organization may run their own individual clusters and want isolation.

When you enable Zookeeper ACLs in Kafka the ACL looks to be set to the 
principal (SPN) that is used to authenticate against Zookeeper.

For example I have brokers:
 * *01.kafka.dev.example.com*
 * *02.kafka.dev.example.com***
 * *03.kafka.dev.example.com***

On *01.kafka.dev.example.com* **I run the below the security-migration tool:
{code:java}
KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf 
-Dzookeeper.sasl.clientconfig=ZkClient" zookeeper-security-migration 
--zookeeper.acl=secure --zookeeper.connect=a01.zookeeper.dev.example.com:2181
{code}
I end up with ACL's in Zookeeper as below:
{code:java}
# [zk: localhost:2181(CONNECTED) 2] getAcl /cluster
# 'sasl,'kafka/01.kafka.dev.example.com@EXAMPLE
# : cdrwa
{code}
This ACL means no other broker in the cluster can access the znode in Zookeeper 
except broker 01.

To resolve the issue you need to set the below properties in Zookeeper's config:
{code:java}
kerberos.removeHostFromPrincipal = true
kerberos.removeRealmFromPrincipal = true
{code}
Now when Kafka set ACL's they are stored as:
{code:java}
# [zk: localhost:2181(CONNECTED) 2] getAcl /cluster
# 'sasl,'kafka
#: cdrwa
{code}
This now means any broker in the cluster can access the ZK node.This means if I 
have a dev Kafka broker it can right to a "prod.zookeeper.example.com" 
zookeeper host as when it auth's based on a SPN 
"kafka/01.kafka.dev.example.com" the host is dropped and we auth against the 
service principal kafka.

If your organization is flexible you may be able to create different Kerberos 
Realms per cluster and use:
{code:java}
kerberos.removeHostFromPrincipal = true
kerberos.removeRealmFromPrincipal = false{code}
That means acl's will be in the format "kafka/REALM" which means only brokers 
in the same realm can connect. The difficulty here is your average large 
organization security team willing to create adhoc realms.

*Proposal*

Kafka support setting ACLs for all known brokers in the cluster i.e ACLs on a 
Znode have
{code:java}
kafka/01.kafka.dev.example.com@EXAMPLE
kafka/02.kafka.dev.example.com@EXAMPLE
kafka/03.kafka.dev.example.com@EXAMPLE{code}
With this though some kind of support will need to be added so if a new broker 
joins the cluster the host ACL gets added to existing ZNodes.

  was:
I have seen many organizations run many Kafka clusters. The simplest scenario 
is you may have a *kafka.dev.example.com* cluster and a 
*kafka.prod.example.com* cluster. The more extreme examples is teams with in an 
organization may run their own individual clusters.

When you enable Zookeeper ACLs in Kafka the ACL looks to be set to the 
principal (SPN) that is used to authenticate against Zookeeper.

For example I have brokers:
 * *01.kafka.dev.example.com*
 * *02.kafka.dev.example.com***
 * *03.kafka.dev.example.com***

On *01.kafka.dev.example.com* **I run the below the security-migration tool:
{code:java}
KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf 
-Dzookeeper.sasl.clientconfig=ZkClient" zookeeper-security-migration 
--zookeeper.acl=secure --zookeeper.connect=a01.zookeeper.dev.example.com:2181
{code}
I end up with ACL's in Zookeeper as below:
{code:java}
# [zk: localhost:2181(CONNECTED) 2] getAcl /cluster
# 'sasl,'kafka/01.kafka.dev.example.com@EXAMPLE
# : cdrwa
{code}
This ACL means no other broker in the cluster can access the znode in Zookeeper 
except broker 01.

To resolve the issue you need to set the below properties in Zookeeper's config:
{code:java}
kerberos.removeHostFromPrincipal = true
kerberos.removeRealmFromPrincipal = true
{code}
Now when Kafka set ACL's they are stored as:
{code:java}
# [zk: localhost:2181(CONNECTED) 2] getAcl /cluster
# 'sasl,'kafka
#: cdrwa
{code}
This now means any broker in the cluster can access the ZK node.This means if I 
have a dev Kafka broker it can right to a "prod.zookeeper.example.com" 
zookeeper host as when it auth's based on a SPN 
"kafka/01.kafka.dev.example.com" the host is dropped and we auth against the 
service principal kafka.

If your organization is flexible you may be able to create different Kerberos 
Realms per cluster and use:
{code:java}
kerberos.removeHostFromPrincipal = true
kerberos.removeRealmFromPrincipal = false{code}
That means acl's will be in the format "kafka/REALM" which means only brokers 
in the same realm can connect. The difficulty here is your average large 
organization security team willing to create adhoc realms.

*Proposal*

Kafka support setting ACLs 

[jira] [Updated] (KAFKA-7710) Poor Zookeeper ACL management with Kerberos

2018-12-05 Thread Mr Kafka (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mr Kafka updated KAFKA-7710:

Description: 
I have seen many organizations run many Kafka clusters. The simplest scenario 
is you may have a *kafka.dev.example.com* cluster and a 
*kafka.prod.example.com* cluster. The more extreme examples is teams with in an 
organization may run their own individual clusters.

When you enable Zookeeper ACLs in Kafka the ACL looks to be set to the 
principal (SPN) that is used to authenticate against Zookeeper.

For example I have brokers:
 * *01.kafka.dev.example.com*
 * *02.kafka.dev.example.com***
 * *03.kafka.dev.example.com***

On *01.kafka.dev.example.com* **I run the below the security-migration tool:
{code:java}
KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf 
-Dzookeeper.sasl.clientconfig=ZkClient" zookeeper-security-migration 
--zookeeper.acl=secure --zookeeper.connect=a01.zookeeper.dev.example.com:2181
{code}
I end up with ACL's in Zookeeper as below:
{code:java}
# [zk: localhost:2181(CONNECTED) 2] getAcl /cluster
# 'sasl,'kafka/01.kafka.dev.example.com@EXAMPLE
# : cdrwa
{code}
This ACL means no other broker in the cluster can access the znode in Zookeeper 
except broker 01.

To resolve the issue you need to set the below properties in Zookeeper's config:
{code:java}
kerberos.removeHostFromPrincipal = true
kerberos.removeRealmFromPrincipal = true
{code}
Now when Kafka set ACL's they are stored as:
{code:java}
# [zk: localhost:2181(CONNECTED) 2] getAcl /cluster
# 'sasl,'kafka
#: cdrwa
{code}
This now means any broker in the cluster can access the ZK node.This means if I 
have a dev Kafka broker it can right to a "prod.zookeeper.example.com" 
zookeeper host as when it auth's based on a SPN 
"kafka/01.kafka.dev.example.com" the host is dropped and we auth against the 
service principal kafka.

If your organization is flexible you may be able to create different Kerberos 
Realms per cluster and use:
{code:java}
kerberos.removeHostFromPrincipal = true
kerberos.removeRealmFromPrincipal = false{code}
That means acl's will be in the format "kafka/REALM" which means only brokers 
in the same realm can connect. The difficulty here is your average large 
organization security team willing to create adhoc realms.

*Proposal*

Kafka support setting ACLs for all known brokers in the cluster i.e ACLs on a 
Znode have
{code:java}
kafka/01.kafka.dev.example.com@EXAMPLE
kafka/02.kafka.dev.example.com@EXAMPLE
kafka/03.kafka.dev.example.com@EXAMPLE{code}
With this though some kind of support will need to be added so if a new broker 
joins the cluster the host ACL gets added to existing ZNodes.

  was:
I have seen many organizations run many Kafka clusters. The simplest scenario 
is you may have a *kafka.dev.example.com* cluster and a 
*kafka.prod.example.com* cluster. The more extreme examples is teams with in an 
organization may run their own individual clusters.

When you enable Zookeeper ACLs in Kafka the ACL looks to be set to the 
principal (SPN) that is used to authenticate against Zookeeper.

For example I have brokers:
 * *01.kafka.dev.example.com*
 * *02.kafka.dev.example.com***
 * *03.kafka.dev.example.com***

On *01.kafka.dev.example.com* **I run the below the security-migration tool:
{code:java}
KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf 
-Dzookeeper.sasl.clientconfig=ZkClient" zookeeper-security-migration 
--zookeeper.acl=secure --zookeeper.connect=a01.zookeeper.dev.example.com:2181
{code}
I end up with ACL's in Zookeeper as below:
{code:java}
# [zk: localhost:2181(CONNECTED) 2] getAcl /cluster
# 'sasl,'kafka/01.kafka.dev.example.com@EXAMPLE
# : cdrwa
{code}
This ACL means no other broker in the cluster can access the znode in Zookeeper 
except broker 01.

To resolve the issue you need to set the below properties in Zookeeper's config:
{code:java}
kerberos.removeHostFromPrincipal = true
kerberos.removeRealmFromPrincipal = true
{code}
Now when Kafka set ACL's they are stored as:
{code:java}
# [zk: localhost:2181(CONNECTED) 2] getAcl /cluster
# 'sasl,'kafka
#: cdrwa
{code}
This now means any broker in the cluster can access the ZK node.This means if I 
have a dev Kafka broker it can right to a "prod.zookeeper.example.com" 
zookeeper host as when it auth's based on a SPN 
"kafka/01.kafka.dev.example.com" the host is dropped and we auth against the 
service principal kafka.

If your organization is flexible you may be able to create different Kerberos 
Realms per cluster and use:
{code:java}
kerberos.removeHostFromPrincipal = true
kerberos.removeRealmFromPrincipal = false{code}
That means acl's will be in the format "kafka/REALM" which means only brokers 
in the same realm can connect. The difficulty here is your average large 
organization security team willing to create adhoc realms.

*Proposal*

Kafka support setting ACLs for all known 

[jira] [Created] (KAFKA-7710) Poor Zookeeper ACL management with Kerberos

2018-12-05 Thread Mr Kafka (JIRA)
Mr Kafka created KAFKA-7710:
---

 Summary: Poor Zookeeper ACL management with Kerberos
 Key: KAFKA-7710
 URL: https://issues.apache.org/jira/browse/KAFKA-7710
 Project: Kafka
  Issue Type: Bug
Reporter: Mr Kafka


I have seen many organizations run many Kafka clusters. The simplest scenario 
is you may have a *kafka.dev.example.com* cluster and a 
*kafka.prod.example.com* cluster. The more extreme examples is teams with in an 
organization may run their own individual clusters.

 

When you enable Zookeeper ACLs in Kafka the ACL looks to be set to the 
principal (SPN) that is used to authenticate against Zookeeper.

For example I have brokers:
 * *01.kafka.dev.example.com*
 * *02.kafka.dev.example.com***
 * *03.kafka.dev.example.com***

On *01.kafka.dev.example.com* **I run the below the security-migration tool:
{code:java}
KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf 
-Dzookeeper.sasl.clientconfig=ZkClient" zookeeper-security-migration 
--zookeeper.acl=secure --zookeeper.connect=a01.zookeeper.dev.example.com:2181
{code}
I end up with ACL's in Zookeeper as below:
{code:java}
# [zk: localhost:2181(CONNECTED) 2] getAcl /cluster
# 'sasl,'kafka/01.kafka.dev.example.com@EXAMPLE
# : cdrwa
{code}
This ACL means no other broker in the cluster can access the znode in Zookeeper 
except broker 01.

To resolve the issue you need to set the below properties in Zookeeper's config:
{code:java}
kerberos.removeHostFromPrincipal = true
kerberos.removeRealmFromPrincipal = true
{code}
Now when Kafka set ACL's they are stored as:

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-10-22 Thread Mr Kafka (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659769#comment-16659769
 ] 

Mr Kafka edited comment on KAFKA-7510 at 10/22/18 10:33 PM:


[~mjsax]
{quote}Why does it not contradict KAFKA-6538? It is about adding key/value in 
human readable form to exception messages that end up in the logs. While this 
ticket is about removing key/value data from the logs. What do you mean by "it 
only affects it's implementation"?
{quote}
You can still add key/vaue/headers or as much useful data as you want to the 
log messages, those log messages will be under TRACE level and a user has to 
explicitly turn TRACE level on to see the extra information.

Happy to create a PR to move out key/value logging from RecordCollectorImpl to 
trace as it directly blocks me and I'm likely to do this in a private fork 
regardless until it is in an official release. Not so interested on creating a 
KIP and long discussions on moving sensitive log data to a different log level 
which is as simple as adding a *log.trace(...)* below the *log.error(...)* 
statements.


was (Author: mrkafka):
[~mjsax]

{quote}

Why does it not contradict KAFKA-6538? It is about adding key/value in human 
readable form to exception messages that end up in the logs. While this ticket 
is about removing key/value data from the logs. What do you mean by "it only 
affects it's implementation"?

{quote}

You can still add key/vaue/headers or as much useful data as you want to the 
log messages, those log messages will be under TRACE level and a user has to 
explicitly turn TRACE level on to see the extra information.

Happy to create a PR to move out key/value logging from RecordCollectorImpl to 
trace as it directly blocks me and I'm likely to do this in a private fork 
regardless until it is done. Not so interested on creating a KIP and long 
discussions on moving sensitive log data to a different log level which is as 
simple as adding a *log.trace(...)* below the *log.error(...)* statements.

> KStreams RecordCollectorImpl leaks data to logs on error
> 
>
> Key: KAFKA-7510
> URL: https://issues.apache.org/jira/browse/KAFKA-7510
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mr Kafka
>Priority: Major
>  Labels: user-experience
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data 
> on error as it dumps the *value* / message payload to the logs.
> This is problematic as it may contain personally identifiable information 
> (pii) or other secret information to plain text log files which can then be 
> propagated to other log systems i.e Splunk.
> I suggest the *key*, and *value* fields be moved to debug level as it is 
> useful for some people while error level contains the *errorMessage, 
> timestamp, topic* and *stackTrace*.
> {code:java}
> private  void recordSendError(
> final K key,
> final V value,
> final Long timestamp,
> final String topic,
> final Exception exception
> ) {
> String errorLogMessage = LOG_MESSAGE;
> String errorMessage = EXCEPTION_MESSAGE;
> if (exception instanceof RetriableException) {
> errorLogMessage += PARAMETER_HINT;
> errorMessage += PARAMETER_HINT;
> }
> log.error(errorLogMessage, key, value, timestamp, topic, 
> exception.toString());
> sendException = new StreamsException(
> String.format(
> errorMessage,
> logPrefix,
> "an error caught",
> key,
> value,
> timestamp,
> topic,
> exception.toString()
> ),
> exception);
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-10-22 Thread Mr Kafka (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659769#comment-16659769
 ] 

Mr Kafka commented on KAFKA-7510:
-

[~mjsax]

{quote}

Why does it not contradict KAFKA-6538? It is about adding key/value in human 
readable form to exception messages that end up in the logs. While this ticket 
is about removing key/value data from the logs. What do you mean by "it only 
affects it's implementation"?

{quote}

You can still add key/vaue/headers or as much useful data as you want to the 
log messages, those log messages will be under TRACE level and a user has to 
explicitly turn TRACE level on to see the extra information.

Happy to create a PR to move out key/value logging from RecordCollectorImpl to 
trace as it directly blocks me and I'm likely to do this in a private fork 
regardless until it is done. Not so interested on creating a KIP and long 
discussions on moving sensitive log data to a different log level which is as 
simple as adding a *log.trace(...)* below the *log.error(...)* statements.

> KStreams RecordCollectorImpl leaks data to logs on error
> 
>
> Key: KAFKA-7510
> URL: https://issues.apache.org/jira/browse/KAFKA-7510
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mr Kafka
>Priority: Major
>  Labels: user-experience
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data 
> on error as it dumps the *value* / message payload to the logs.
> This is problematic as it may contain personally identifiable information 
> (pii) or other secret information to plain text log files which can then be 
> propagated to other log systems i.e Splunk.
> I suggest the *key*, and *value* fields be moved to debug level as it is 
> useful for some people while error level contains the *errorMessage, 
> timestamp, topic* and *stackTrace*.
> {code:java}
> private  void recordSendError(
> final K key,
> final V value,
> final Long timestamp,
> final String topic,
> final Exception exception
> ) {
> String errorLogMessage = LOG_MESSAGE;
> String errorMessage = EXCEPTION_MESSAGE;
> if (exception instanceof RetriableException) {
> errorLogMessage += PARAMETER_HINT;
> errorMessage += PARAMETER_HINT;
> }
> log.error(errorLogMessage, key, value, timestamp, topic, 
> exception.toString());
> sendException = new StreamsException(
> String.format(
> errorMessage,
> logPrefix,
> "an error caught",
> key,
> value,
> timestamp,
> topic,
> exception.toString()
> ),
> exception);
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-10-21 Thread Mr Kafka (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658427#comment-16658427
 ] 

Mr Kafka commented on KAFKA-7510:
-

Some comments
 * Agree this should be handled consistently throughout the suite of Kafka + 
tools i.e connect etc etc. I only raised it on RecordCollectorImpl as this was 
where I noticed the issue of leaking data.
 * This does not contradict https://issues.apache.org/jira/browse/KAFKA-6538 it 
only affects it's implementation.
 * While every application is not sensitive we should do due diligence by 
default, especially with the markets Kafka is trying to work with. Data belongs 
in *log.dirs* not in log4j output by default at ERROR level
 ** The only way to suppress sensitive information is to disable ERROR level 
logs. Doing so would make it impossible to deploy any serious production 
deployment of KStreams in an heavy regulated environments without knowingly 
breaking some regulation i.e not leaking secret/sensitive information.
 ** There's no reason at ERROR level the log message cannot contain "Enable 
TRACE logging to see failed message contents"
 ** By moving the output to TRACE level a user has to actively enable dumping 
data to log4j, they have made the conscious choice and had to take in their own 
operational requirements so this becomes a feature switch. Further moving 
dumping raw data to DEBUG/TRACE level a user can set up a seperate log4j 
appender to handle this data, they can actively exclude it from going 
downstream, pipe it to it's own file which has further restricted access etc 
etc. Likewise as this has to be actively enabled enhanced contextual 
information can be added.
 ** key/values are generated by *toString*. If an application has large message 
sizes, large even being 1MB and also has high throughput, on large amounts of 
errors KStreams has the potential to denial of service it self on error by 
eating all available drive space, log4j log output will likely be on the OS 
volume while data / rocksdb on a separate volume in a prod deployment.

 

> KStreams RecordCollectorImpl leaks data to logs on error
> 
>
> Key: KAFKA-7510
> URL: https://issues.apache.org/jira/browse/KAFKA-7510
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mr Kafka
>Priority: Major
>  Labels: user-experience
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data 
> on error as it dumps the *value* / message payload to the logs.
> This is problematic as it may contain personally identifiable information 
> (pii) or other secret information to plain text log files which can then be 
> propagated to other log systems i.e Splunk.
> I suggest the *key*, and *value* fields be moved to debug level as it is 
> useful for some people while error level contains the *errorMessage, 
> timestamp, topic* and *stackTrace*.
> {code:java}
> private  void recordSendError(
> final K key,
> final V value,
> final Long timestamp,
> final String topic,
> final Exception exception
> ) {
> String errorLogMessage = LOG_MESSAGE;
> String errorMessage = EXCEPTION_MESSAGE;
> if (exception instanceof RetriableException) {
> errorLogMessage += PARAMETER_HINT;
> errorMessage += PARAMETER_HINT;
> }
> log.error(errorLogMessage, key, value, timestamp, topic, 
> exception.toString());
> sendException = new StreamsException(
> String.format(
> errorMessage,
> logPrefix,
> "an error caught",
> key,
> value,
> timestamp,
> topic,
> exception.toString()
> ),
> exception);
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-10-16 Thread Mr Kafka (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16652602#comment-16652602
 ] 

Mr Kafka commented on KAFKA-7510:
-

To Kafka the key and value are just bytes, those bytes may be anything and 
depend on an end users use case. Kafka has no control what a user sets for 
those values so it's wrong to make assumptions it is safe to log any of them.

In the trivialest scenario an example is the key may be an email address, or 
the key might be a credit card number, it may also be a full Avro record or 
some other complex blob which contains sensitive information, we just don't 
know. Kafka has no way of knowing what the values are or if the values are safe 
to log so it should be behind a feature switch it, easiest being placing them 
at DEBUG level.

> KStreams RecordCollectorImpl leaks data to logs on error
> 
>
> Key: KAFKA-7510
> URL: https://issues.apache.org/jira/browse/KAFKA-7510
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Mr Kafka
>Priority: Major
>  Labels: user-experience
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data 
> on error as it dumps the *value* / message payload to the logs.
> This is problematic as it may contain personally identifiable information 
> (pii) or other secret information to plain text log files which can then be 
> propagated to other log systems i.e Splunk.
> I suggest the *key*, and *value* fields be moved to debug level as it is 
> useful for some people while error level contains the *errorMessage, 
> timestamp, topic* and *stackTrace*.
> {code:java}
> private  void recordSendError(
> final K key,
> final V value,
> final Long timestamp,
> final String topic,
> final Exception exception
> ) {
> String errorLogMessage = LOG_MESSAGE;
> String errorMessage = EXCEPTION_MESSAGE;
> if (exception instanceof RetriableException) {
> errorLogMessage += PARAMETER_HINT;
> errorMessage += PARAMETER_HINT;
> }
> log.error(errorLogMessage, key, value, timestamp, topic, 
> exception.toString());
> sendException = new StreamsException(
> String.format(
> errorMessage,
> logPrefix,
> "an error caught",
> key,
> value,
> timestamp,
> topic,
> exception.toString()
> ),
> exception);
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-10-15 Thread Mr Kafka (JIRA)
Mr Kafka created KAFKA-7510:
---

 Summary: KStreams RecordCollectorImpl leaks data to logs on error
 Key: KAFKA-7510
 URL: https://issues.apache.org/jira/browse/KAFKA-7510
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Mr Kafka


org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data on 
error as it dumps the *value* / message payload to the logs.

This is problematic as it may contain personally identifiable information (pii) 
or other secret information to plain text log files which can then be 
propagated to other log systems i.e Splunk.

I suggest the *key*, and *value* fields be moved to debug level as it is useful 
for some people while error level contains the *errorMessage, timestamp, topic* 
and *stackTrace*.
{code:java}
private  void recordSendError(
final K key,
final V value,
final Long timestamp,
final String topic,
final Exception exception
) {
String errorLogMessage = LOG_MESSAGE;
String errorMessage = EXCEPTION_MESSAGE;
if (exception instanceof RetriableException) {
errorLogMessage += PARAMETER_HINT;
errorMessage += PARAMETER_HINT;
}
log.error(errorLogMessage, key, value, timestamp, topic, 
exception.toString());
sendException = new StreamsException(
String.format(
errorMessage,
logPrefix,
"an error caught",
key,
value,
timestamp,
topic,
exception.toString()
),
exception);
}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)