[jira] [Updated] (KAFKA-7710) Poor Zookeeper ACL management with Kerberos
[ https://issues.apache.org/jira/browse/KAFKA-7710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mr Kafka updated KAFKA-7710: Description: I have seen many organizations run many Kafka clusters. The simplest scenario is you may have a *kafka.dev.example.com* cluster and a *kafka.prod.example.com* cluster. The more extreme examples is teams with in an organization may run their own individual clusters. When you enable Zookeeper ACLs in Kafka the ACL looks to be set to the principal (SPN) that is used to authenticate against Zookeeper. For example I have brokers: * *01.kafka.dev.example.com* * *02.kafka.dev.example.com*** * *03.kafka.dev.example.com*** On *01.kafka.dev.example.com* **I run the below the security-migration tool: {code:java} KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf -Dzookeeper.sasl.clientconfig=ZkClient" zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connect=a01.zookeeper.dev.example.com:2181 {code} I end up with ACL's in Zookeeper as below: {code:java} # [zk: localhost:2181(CONNECTED) 2] getAcl /cluster # 'sasl,'kafka/01.kafka.dev.example.com@EXAMPLE # : cdrwa {code} This ACL means no other broker in the cluster can access the znode in Zookeeper except broker 01. To resolve the issue you need to set the below properties in Zookeeper's config: {code:java} kerberos.removeHostFromPrincipal = true kerberos.removeRealmFromPrincipal = true {code} Now when Kafka set ACL's they are stored as: {code:java} # [zk: localhost:2181(CONNECTED) 2] getAcl /cluster # 'sasl,'kafka #: cdrwa {code} This now means any broker in the cluster can access the ZK node.This means if I have a dev Kafka broker it can right to a "prod.zookeeper.example.com" zookeeper host as when it auth's based on a SPN "kafka/01.kafka.dev.example.com" the host is dropped and we auth against the service principal kafka. If your organization is flexible you may be able to create different Kerberos Realms per cluster and use: {code:java} kerberos.removeHostFromPrincipal = true kerberos.removeRealmFromPrincipal = false{code} That means acl's will be in the format "kafka/REALM" which means only brokers in the same realm can connect. The difficulty here is your average large organization security team willing to create adhoc realms. *Proposal* Kafka support setting ACLs for all known brokers in the cluster i.e ACLs on a Znode have {code:java} kafka/01.kafka.dev.example.com@EXAMPLE kafka/02.kafka.dev.example.com@EXAMPLE kafka/03.kafka.dev.example.com@EXAMPLE{code} With this though some kind of support will need to be added so if a new broker is added to a cluster the host ACL gets added to existing ZNodes. was: I have seen many organizations run many Kafka clusters. The simplest scenario is you may have a *kafka.dev.example.com* cluster and a *kafka.prod.example.com* cluster. The more extreme examples is teams with in an organization may run their own individual clusters. When you enable Zookeeper ACLs in Kafka the ACL looks to be set to the principal (SPN) that is used to authenticate against Zookeeper. For example I have brokers: * *01.kafka.dev.example.com* * *02.kafka.dev.example.com*** * *03.kafka.dev.example.com*** On *01.kafka.dev.example.com* **I run the below the security-migration tool: {code:java} KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf -Dzookeeper.sasl.clientconfig=ZkClient" zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connect=a01.zookeeper.dev.example.com:2181 {code} I end up with ACL's in Zookeeper as below: {code:java} # [zk: localhost:2181(CONNECTED) 2] getAcl /cluster # 'sasl,'kafka/01.kafka.dev.example.com@EXAMPLE # : cdrwa {code} This ACL means no other broker in the cluster can access the znode in Zookeeper except broker 01. To resolve the issue you need to set the below properties in Zookeeper's config: {code:java} kerberos.removeHostFromPrincipal = true kerberos.removeRealmFromPrincipal = true {code} Now when Kafka set ACL's they are stored as: > Poor Zookeeper ACL management with Kerberos > --- > > Key: KAFKA-7710 > URL: https://issues.apache.org/jira/browse/KAFKA-7710 > Project: Kafka > Issue Type: Bug >Reporter: Mr Kafka >Priority: Major > > I have seen many organizations run many Kafka clusters. The simplest scenario > is you may have a *kafka.dev.example.com* cluster and a > *kafka.prod.example.com* cluster. The more extreme examples is teams with in > an organization may run their own individual clusters. > When you enable Zookeeper ACLs in Kafka the ACL looks to be set to the > principal (SPN) that is used to authenticate against Zookeeper. > For example I have brokers: > * *01.kafka.dev.example.com* > * *02.kafka.dev.example.com*** > * *03.kafka.dev.example.com*** >
[jira] [Updated] (KAFKA-7710) Poor Zookeeper ACL management with Kerberos
[ https://issues.apache.org/jira/browse/KAFKA-7710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mr Kafka updated KAFKA-7710: Description: I have seen many organizations run many Kafka clusters. The simplest scenario is you may have a *kafka.dev.example.com* cluster and a *kafka.prod.example.com* cluster. The more extreme examples is teams within an organization may run their own individual clusters and want isolation. When you enable Zookeeper ACLs in Kafka the ACL looks to be set to the principal (SPN) that is used to authenticate against Zookeeper. For example I have brokers: * *01.kafka.dev.example.com* * *02.kafka.dev.example.com*** * *03.kafka.dev.example.com*** On *01.kafka.dev.example.com* **I run the below the security-migration tool: {code:java} KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf -Dzookeeper.sasl.clientconfig=ZkClient" zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connect=a01.zookeeper.dev.example.com:2181 {code} I end up with ACL's in Zookeeper as below: {code:java} # [zk: localhost:2181(CONNECTED) 2] getAcl /cluster # 'sasl,'kafka/01.kafka.dev.example.com@EXAMPLE # : cdrwa {code} This ACL means no other broker in the cluster can access the znode in Zookeeper except broker 01. To resolve the issue you need to set the below properties in Zookeeper's config: {code:java} kerberos.removeHostFromPrincipal = true kerberos.removeRealmFromPrincipal = true {code} Now when Kafka set ACL's they are stored as: {code:java} # [zk: localhost:2181(CONNECTED) 2] getAcl /cluster # 'sasl,'kafka #: cdrwa {code} This now means any broker in the cluster can access the ZK node.This means if I have a dev Kafka broker it can right to a "prod.zookeeper.example.com" zookeeper host as when it auth's based on a SPN "kafka/01.kafka.dev.example.com" the host is dropped and we auth against the service principal kafka. If your organization is flexible you may be able to create different Kerberos Realms per cluster and use: {code:java} kerberos.removeHostFromPrincipal = true kerberos.removeRealmFromPrincipal = false{code} That means acl's will be in the format "kafka/REALM" which means only brokers in the same realm can connect. The difficulty here is your average large organization security team willing to create adhoc realms. *Proposal* Kafka support setting ACLs for all known brokers in the cluster i.e ACLs on a Znode have {code:java} kafka/01.kafka.dev.example.com@EXAMPLE kafka/02.kafka.dev.example.com@EXAMPLE kafka/03.kafka.dev.example.com@EXAMPLE{code} With this though some kind of support will need to be added so if a new broker joins the cluster the host ACL gets added to existing ZNodes. was: I have seen many organizations run many Kafka clusters. The simplest scenario is you may have a *kafka.dev.example.com* cluster and a *kafka.prod.example.com* cluster. The more extreme examples is teams with in an organization may run their own individual clusters. When you enable Zookeeper ACLs in Kafka the ACL looks to be set to the principal (SPN) that is used to authenticate against Zookeeper. For example I have brokers: * *01.kafka.dev.example.com* * *02.kafka.dev.example.com*** * *03.kafka.dev.example.com*** On *01.kafka.dev.example.com* **I run the below the security-migration tool: {code:java} KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf -Dzookeeper.sasl.clientconfig=ZkClient" zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connect=a01.zookeeper.dev.example.com:2181 {code} I end up with ACL's in Zookeeper as below: {code:java} # [zk: localhost:2181(CONNECTED) 2] getAcl /cluster # 'sasl,'kafka/01.kafka.dev.example.com@EXAMPLE # : cdrwa {code} This ACL means no other broker in the cluster can access the znode in Zookeeper except broker 01. To resolve the issue you need to set the below properties in Zookeeper's config: {code:java} kerberos.removeHostFromPrincipal = true kerberos.removeRealmFromPrincipal = true {code} Now when Kafka set ACL's they are stored as: {code:java} # [zk: localhost:2181(CONNECTED) 2] getAcl /cluster # 'sasl,'kafka #: cdrwa {code} This now means any broker in the cluster can access the ZK node.This means if I have a dev Kafka broker it can right to a "prod.zookeeper.example.com" zookeeper host as when it auth's based on a SPN "kafka/01.kafka.dev.example.com" the host is dropped and we auth against the service principal kafka. If your organization is flexible you may be able to create different Kerberos Realms per cluster and use: {code:java} kerberos.removeHostFromPrincipal = true kerberos.removeRealmFromPrincipal = false{code} That means acl's will be in the format "kafka/REALM" which means only brokers in the same realm can connect. The difficulty here is your average large organization security team willing to create adhoc realms. *Proposal* Kafka support setting ACLs
[jira] [Updated] (KAFKA-7710) Poor Zookeeper ACL management with Kerberos
[ https://issues.apache.org/jira/browse/KAFKA-7710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mr Kafka updated KAFKA-7710: Description: I have seen many organizations run many Kafka clusters. The simplest scenario is you may have a *kafka.dev.example.com* cluster and a *kafka.prod.example.com* cluster. The more extreme examples is teams with in an organization may run their own individual clusters. When you enable Zookeeper ACLs in Kafka the ACL looks to be set to the principal (SPN) that is used to authenticate against Zookeeper. For example I have brokers: * *01.kafka.dev.example.com* * *02.kafka.dev.example.com*** * *03.kafka.dev.example.com*** On *01.kafka.dev.example.com* **I run the below the security-migration tool: {code:java} KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf -Dzookeeper.sasl.clientconfig=ZkClient" zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connect=a01.zookeeper.dev.example.com:2181 {code} I end up with ACL's in Zookeeper as below: {code:java} # [zk: localhost:2181(CONNECTED) 2] getAcl /cluster # 'sasl,'kafka/01.kafka.dev.example.com@EXAMPLE # : cdrwa {code} This ACL means no other broker in the cluster can access the znode in Zookeeper except broker 01. To resolve the issue you need to set the below properties in Zookeeper's config: {code:java} kerberos.removeHostFromPrincipal = true kerberos.removeRealmFromPrincipal = true {code} Now when Kafka set ACL's they are stored as: {code:java} # [zk: localhost:2181(CONNECTED) 2] getAcl /cluster # 'sasl,'kafka #: cdrwa {code} This now means any broker in the cluster can access the ZK node.This means if I have a dev Kafka broker it can right to a "prod.zookeeper.example.com" zookeeper host as when it auth's based on a SPN "kafka/01.kafka.dev.example.com" the host is dropped and we auth against the service principal kafka. If your organization is flexible you may be able to create different Kerberos Realms per cluster and use: {code:java} kerberos.removeHostFromPrincipal = true kerberos.removeRealmFromPrincipal = false{code} That means acl's will be in the format "kafka/REALM" which means only brokers in the same realm can connect. The difficulty here is your average large organization security team willing to create adhoc realms. *Proposal* Kafka support setting ACLs for all known brokers in the cluster i.e ACLs on a Znode have {code:java} kafka/01.kafka.dev.example.com@EXAMPLE kafka/02.kafka.dev.example.com@EXAMPLE kafka/03.kafka.dev.example.com@EXAMPLE{code} With this though some kind of support will need to be added so if a new broker joins the cluster the host ACL gets added to existing ZNodes. was: I have seen many organizations run many Kafka clusters. The simplest scenario is you may have a *kafka.dev.example.com* cluster and a *kafka.prod.example.com* cluster. The more extreme examples is teams with in an organization may run their own individual clusters. When you enable Zookeeper ACLs in Kafka the ACL looks to be set to the principal (SPN) that is used to authenticate against Zookeeper. For example I have brokers: * *01.kafka.dev.example.com* * *02.kafka.dev.example.com*** * *03.kafka.dev.example.com*** On *01.kafka.dev.example.com* **I run the below the security-migration tool: {code:java} KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf -Dzookeeper.sasl.clientconfig=ZkClient" zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connect=a01.zookeeper.dev.example.com:2181 {code} I end up with ACL's in Zookeeper as below: {code:java} # [zk: localhost:2181(CONNECTED) 2] getAcl /cluster # 'sasl,'kafka/01.kafka.dev.example.com@EXAMPLE # : cdrwa {code} This ACL means no other broker in the cluster can access the znode in Zookeeper except broker 01. To resolve the issue you need to set the below properties in Zookeeper's config: {code:java} kerberos.removeHostFromPrincipal = true kerberos.removeRealmFromPrincipal = true {code} Now when Kafka set ACL's they are stored as: {code:java} # [zk: localhost:2181(CONNECTED) 2] getAcl /cluster # 'sasl,'kafka #: cdrwa {code} This now means any broker in the cluster can access the ZK node.This means if I have a dev Kafka broker it can right to a "prod.zookeeper.example.com" zookeeper host as when it auth's based on a SPN "kafka/01.kafka.dev.example.com" the host is dropped and we auth against the service principal kafka. If your organization is flexible you may be able to create different Kerberos Realms per cluster and use: {code:java} kerberos.removeHostFromPrincipal = true kerberos.removeRealmFromPrincipal = false{code} That means acl's will be in the format "kafka/REALM" which means only brokers in the same realm can connect. The difficulty here is your average large organization security team willing to create adhoc realms. *Proposal* Kafka support setting ACLs for all known
[jira] [Created] (KAFKA-7710) Poor Zookeeper ACL management with Kerberos
Mr Kafka created KAFKA-7710: --- Summary: Poor Zookeeper ACL management with Kerberos Key: KAFKA-7710 URL: https://issues.apache.org/jira/browse/KAFKA-7710 Project: Kafka Issue Type: Bug Reporter: Mr Kafka I have seen many organizations run many Kafka clusters. The simplest scenario is you may have a *kafka.dev.example.com* cluster and a *kafka.prod.example.com* cluster. The more extreme examples is teams with in an organization may run their own individual clusters. When you enable Zookeeper ACLs in Kafka the ACL looks to be set to the principal (SPN) that is used to authenticate against Zookeeper. For example I have brokers: * *01.kafka.dev.example.com* * *02.kafka.dev.example.com*** * *03.kafka.dev.example.com*** On *01.kafka.dev.example.com* **I run the below the security-migration tool: {code:java} KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf -Dzookeeper.sasl.clientconfig=ZkClient" zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connect=a01.zookeeper.dev.example.com:2181 {code} I end up with ACL's in Zookeeper as below: {code:java} # [zk: localhost:2181(CONNECTED) 2] getAcl /cluster # 'sasl,'kafka/01.kafka.dev.example.com@EXAMPLE # : cdrwa {code} This ACL means no other broker in the cluster can access the znode in Zookeeper except broker 01. To resolve the issue you need to set the below properties in Zookeeper's config: {code:java} kerberos.removeHostFromPrincipal = true kerberos.removeRealmFromPrincipal = true {code} Now when Kafka set ACL's they are stored as: -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error
[ https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659769#comment-16659769 ] Mr Kafka edited comment on KAFKA-7510 at 10/22/18 10:33 PM: [~mjsax] {quote}Why does it not contradict KAFKA-6538? It is about adding key/value in human readable form to exception messages that end up in the logs. While this ticket is about removing key/value data from the logs. What do you mean by "it only affects it's implementation"? {quote} You can still add key/vaue/headers or as much useful data as you want to the log messages, those log messages will be under TRACE level and a user has to explicitly turn TRACE level on to see the extra information. Happy to create a PR to move out key/value logging from RecordCollectorImpl to trace as it directly blocks me and I'm likely to do this in a private fork regardless until it is in an official release. Not so interested on creating a KIP and long discussions on moving sensitive log data to a different log level which is as simple as adding a *log.trace(...)* below the *log.error(...)* statements. was (Author: mrkafka): [~mjsax] {quote} Why does it not contradict KAFKA-6538? It is about adding key/value in human readable form to exception messages that end up in the logs. While this ticket is about removing key/value data from the logs. What do you mean by "it only affects it's implementation"? {quote} You can still add key/vaue/headers or as much useful data as you want to the log messages, those log messages will be under TRACE level and a user has to explicitly turn TRACE level on to see the extra information. Happy to create a PR to move out key/value logging from RecordCollectorImpl to trace as it directly blocks me and I'm likely to do this in a private fork regardless until it is done. Not so interested on creating a KIP and long discussions on moving sensitive log data to a different log level which is as simple as adding a *log.trace(...)* below the *log.error(...)* statements. > KStreams RecordCollectorImpl leaks data to logs on error > > > Key: KAFKA-7510 > URL: https://issues.apache.org/jira/browse/KAFKA-7510 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mr Kafka >Priority: Major > Labels: user-experience > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data > on error as it dumps the *value* / message payload to the logs. > This is problematic as it may contain personally identifiable information > (pii) or other secret information to plain text log files which can then be > propagated to other log systems i.e Splunk. > I suggest the *key*, and *value* fields be moved to debug level as it is > useful for some people while error level contains the *errorMessage, > timestamp, topic* and *stackTrace*. > {code:java} > private void recordSendError( > final K key, > final V value, > final Long timestamp, > final String topic, > final Exception exception > ) { > String errorLogMessage = LOG_MESSAGE; > String errorMessage = EXCEPTION_MESSAGE; > if (exception instanceof RetriableException) { > errorLogMessage += PARAMETER_HINT; > errorMessage += PARAMETER_HINT; > } > log.error(errorLogMessage, key, value, timestamp, topic, > exception.toString()); > sendException = new StreamsException( > String.format( > errorMessage, > logPrefix, > "an error caught", > key, > value, > timestamp, > topic, > exception.toString() > ), > exception); > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error
[ https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659769#comment-16659769 ] Mr Kafka commented on KAFKA-7510: - [~mjsax] {quote} Why does it not contradict KAFKA-6538? It is about adding key/value in human readable form to exception messages that end up in the logs. While this ticket is about removing key/value data from the logs. What do you mean by "it only affects it's implementation"? {quote} You can still add key/vaue/headers or as much useful data as you want to the log messages, those log messages will be under TRACE level and a user has to explicitly turn TRACE level on to see the extra information. Happy to create a PR to move out key/value logging from RecordCollectorImpl to trace as it directly blocks me and I'm likely to do this in a private fork regardless until it is done. Not so interested on creating a KIP and long discussions on moving sensitive log data to a different log level which is as simple as adding a *log.trace(...)* below the *log.error(...)* statements. > KStreams RecordCollectorImpl leaks data to logs on error > > > Key: KAFKA-7510 > URL: https://issues.apache.org/jira/browse/KAFKA-7510 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mr Kafka >Priority: Major > Labels: user-experience > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data > on error as it dumps the *value* / message payload to the logs. > This is problematic as it may contain personally identifiable information > (pii) or other secret information to plain text log files which can then be > propagated to other log systems i.e Splunk. > I suggest the *key*, and *value* fields be moved to debug level as it is > useful for some people while error level contains the *errorMessage, > timestamp, topic* and *stackTrace*. > {code:java} > private void recordSendError( > final K key, > final V value, > final Long timestamp, > final String topic, > final Exception exception > ) { > String errorLogMessage = LOG_MESSAGE; > String errorMessage = EXCEPTION_MESSAGE; > if (exception instanceof RetriableException) { > errorLogMessage += PARAMETER_HINT; > errorMessage += PARAMETER_HINT; > } > log.error(errorLogMessage, key, value, timestamp, topic, > exception.toString()); > sendException = new StreamsException( > String.format( > errorMessage, > logPrefix, > "an error caught", > key, > value, > timestamp, > topic, > exception.toString() > ), > exception); > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error
[ https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658427#comment-16658427 ] Mr Kafka commented on KAFKA-7510: - Some comments * Agree this should be handled consistently throughout the suite of Kafka + tools i.e connect etc etc. I only raised it on RecordCollectorImpl as this was where I noticed the issue of leaking data. * This does not contradict https://issues.apache.org/jira/browse/KAFKA-6538 it only affects it's implementation. * While every application is not sensitive we should do due diligence by default, especially with the markets Kafka is trying to work with. Data belongs in *log.dirs* not in log4j output by default at ERROR level ** The only way to suppress sensitive information is to disable ERROR level logs. Doing so would make it impossible to deploy any serious production deployment of KStreams in an heavy regulated environments without knowingly breaking some regulation i.e not leaking secret/sensitive information. ** There's no reason at ERROR level the log message cannot contain "Enable TRACE logging to see failed message contents" ** By moving the output to TRACE level a user has to actively enable dumping data to log4j, they have made the conscious choice and had to take in their own operational requirements so this becomes a feature switch. Further moving dumping raw data to DEBUG/TRACE level a user can set up a seperate log4j appender to handle this data, they can actively exclude it from going downstream, pipe it to it's own file which has further restricted access etc etc. Likewise as this has to be actively enabled enhanced contextual information can be added. ** key/values are generated by *toString*. If an application has large message sizes, large even being 1MB and also has high throughput, on large amounts of errors KStreams has the potential to denial of service it self on error by eating all available drive space, log4j log output will likely be on the OS volume while data / rocksdb on a separate volume in a prod deployment. > KStreams RecordCollectorImpl leaks data to logs on error > > > Key: KAFKA-7510 > URL: https://issues.apache.org/jira/browse/KAFKA-7510 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mr Kafka >Priority: Major > Labels: user-experience > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data > on error as it dumps the *value* / message payload to the logs. > This is problematic as it may contain personally identifiable information > (pii) or other secret information to plain text log files which can then be > propagated to other log systems i.e Splunk. > I suggest the *key*, and *value* fields be moved to debug level as it is > useful for some people while error level contains the *errorMessage, > timestamp, topic* and *stackTrace*. > {code:java} > private void recordSendError( > final K key, > final V value, > final Long timestamp, > final String topic, > final Exception exception > ) { > String errorLogMessage = LOG_MESSAGE; > String errorMessage = EXCEPTION_MESSAGE; > if (exception instanceof RetriableException) { > errorLogMessage += PARAMETER_HINT; > errorMessage += PARAMETER_HINT; > } > log.error(errorLogMessage, key, value, timestamp, topic, > exception.toString()); > sendException = new StreamsException( > String.format( > errorMessage, > logPrefix, > "an error caught", > key, > value, > timestamp, > topic, > exception.toString() > ), > exception); > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error
[ https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16652602#comment-16652602 ] Mr Kafka commented on KAFKA-7510: - To Kafka the key and value are just bytes, those bytes may be anything and depend on an end users use case. Kafka has no control what a user sets for those values so it's wrong to make assumptions it is safe to log any of them. In the trivialest scenario an example is the key may be an email address, or the key might be a credit card number, it may also be a full Avro record or some other complex blob which contains sensitive information, we just don't know. Kafka has no way of knowing what the values are or if the values are safe to log so it should be behind a feature switch it, easiest being placing them at DEBUG level. > KStreams RecordCollectorImpl leaks data to logs on error > > > Key: KAFKA-7510 > URL: https://issues.apache.org/jira/browse/KAFKA-7510 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Mr Kafka >Priority: Major > Labels: user-experience > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data > on error as it dumps the *value* / message payload to the logs. > This is problematic as it may contain personally identifiable information > (pii) or other secret information to plain text log files which can then be > propagated to other log systems i.e Splunk. > I suggest the *key*, and *value* fields be moved to debug level as it is > useful for some people while error level contains the *errorMessage, > timestamp, topic* and *stackTrace*. > {code:java} > private void recordSendError( > final K key, > final V value, > final Long timestamp, > final String topic, > final Exception exception > ) { > String errorLogMessage = LOG_MESSAGE; > String errorMessage = EXCEPTION_MESSAGE; > if (exception instanceof RetriableException) { > errorLogMessage += PARAMETER_HINT; > errorMessage += PARAMETER_HINT; > } > log.error(errorLogMessage, key, value, timestamp, topic, > exception.toString()); > sendException = new StreamsException( > String.format( > errorMessage, > logPrefix, > "an error caught", > key, > value, > timestamp, > topic, > exception.toString() > ), > exception); > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error
Mr Kafka created KAFKA-7510: --- Summary: KStreams RecordCollectorImpl leaks data to logs on error Key: KAFKA-7510 URL: https://issues.apache.org/jira/browse/KAFKA-7510 Project: Kafka Issue Type: Bug Components: streams Reporter: Mr Kafka org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data on error as it dumps the *value* / message payload to the logs. This is problematic as it may contain personally identifiable information (pii) or other secret information to plain text log files which can then be propagated to other log systems i.e Splunk. I suggest the *key*, and *value* fields be moved to debug level as it is useful for some people while error level contains the *errorMessage, timestamp, topic* and *stackTrace*. {code:java} private void recordSendError( final K key, final V value, final Long timestamp, final String topic, final Exception exception ) { String errorLogMessage = LOG_MESSAGE; String errorMessage = EXCEPTION_MESSAGE; if (exception instanceof RetriableException) { errorLogMessage += PARAMETER_HINT; errorMessage += PARAMETER_HINT; } log.error(errorLogMessage, key, value, timestamp, topic, exception.toString()); sendException = new StreamsException( String.format( errorMessage, logPrefix, "an error caught", key, value, timestamp, topic, exception.toString() ), exception); }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)