[GitHub] spark pull request #23274: [SPARK-26322][SS] Add spark.kafka.token.sasl.mech...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23274#discussion_r240216147 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -642,9 +642,9 @@ This way the application can be configured via Spark parameters and may not need configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). -The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers`, +The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` set, Spark considers the following log in options, in order of preference: -- **JAAS login configuration** +- **JAAS login configuration**, please see example below. --- End diff -- Added this small pointer to make things more clear. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23274: [SPARK-26322][SS] Add spark.kafka.token.sasl.mech...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23274#discussion_r240215906 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -642,9 +642,9 @@ This way the application can be configured via Spark parameters and may not need configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). -The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers`, +The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` set, --- End diff -- I've found this wording issue so fixed it here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23274: [SPARK-26322][SS] Add spark.kafka.token.sasl.mech...
GitHub user gaborgsomogyi opened a pull request: https://github.com/apache/spark/pull/23274 [SPARK-26322][SS] Add spark.kafka.token.sasl.mechanism to ease delegation token configuration. ## What changes were proposed in this pull request? When Kafka delegation token obtained, SCRAM `sasl.mechanism` has to be configured for authentication. This can be configured on the related source/sink which is inconvenient from user perspective. Such granularity is not required and this configuration can be implemented with one central parameter. In this PR `spark.kafka.token.sasl.mechanism` added to configure this centrally (default: `SCRAM-SHA-512`). ## How was this patch tested? Existing unit tests + on cluster. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gaborgsomogyi/spark SPARK-26322 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23274.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23274 commit 320040a90c58c87ce1b21ba6fdc16b703122e8b1 Author: Gabor Somogyi Date: 2018-12-07T15:02:17Z [SPARK-26322][SS] Add spark.kafka.token.sasl.mechanism to ease delegation token configuration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/23156 BTW, coming back to your clean up PR but it takes some time to switch context :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/23156 Ah, ok. This solution was agreed with him on https://github.com/apache/spark/pull/20936. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/23156 I thought this part is not affected. Who leads it? Asking it because haven't seen progress anywhere. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23254: [SPARK-26304][SS] Add default value to spark.kafka.sasl....
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/23254 cc @vanzin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23254: [SPARK-26304][SS] Add default value to spark.kafka.sasl....
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/23254 Unrelated, filed jira: https://issues.apache.org/jira/browse/SPARK-26306 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23254: [SPARK-26304][SS] Add default value to spark.kafka.sasl....
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/23254 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23254: [SPARK-26304][SS] Add default value to spark.kafk...
GitHub user gaborgsomogyi opened a pull request: https://github.com/apache/spark/pull/23254 [SPARK-26304][SS] Add default value to spark.kafka.sasl.kerberos.service.name parameter ## What changes were proposed in this pull request? spark.kafka.sasl.kerberos.service.name is an optional parameter but most of the time value `kafka` ha to be set. As I've written in the jira the following reasoning is behind: * Kafka's configuration guide suggest the same value: https://kafka.apache.org/documentation/#security_sasl_kerberos_brokerconfig * It would be easier for spark users by providing less configuration * Other streaming engines are doing the same In this PR I've changed the parameter from optional to `WithDefault` and set `kafka` as default value. ## How was this patch tested? Available unit tests + on cluster. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gaborgsomogyi/spark SPARK-26304 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23254.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23254 commit 3f56e78e45e9c983816d8222e06d33092ecb2993 Author: Gabor Somogyi Date: 2018-12-06T15:37:35Z [SPARK-26304][SS] Add default value to spark.kafka.sasl.kerberos.service.name parameter --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/23156 @arunmahadevan as I understand this is more like renaming the config than changing what the PR basically does, have I understood it well? Having backpressure instead of stopping the query is already agreed on another PRs, please check them. If the backlog reaches 10k items there is no way back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r239480501 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,199 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)** +- **JAAS login configuration** + +### Delegation token + +This way the application can be configured via Spark parameters and may not need JAAS login +configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information +about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + +The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers`, +Spark considers the following log in options, in order of preference: +- **JAAS login configuration** +- **Keytab file**, such as, + + ./bin/spark-submit \ + --keytab \ + --principal \ + --conf spark.kafka.bootstrap.servers= \ + ... + +- **Kerberos credential cache**, such as, + + ./bin/spark-submit \ + --conf spark.kafka.bootstrap.servers= \ + ... + +The Kafka delegation token provider can be turned off by setting `spark.security.credentials.kafka.enabled` to `false` (default: `true`). + +Spark can be configured to use the following authentication protocols to obtain token (it must match with +Kafka broker configuration): +- **SASL SSL (default)** +- **SSL** +- **SASL PLAINTEXT (for testing)** + +After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly. +Delegation token uses `SCRAM` login module for authentication and because of that the appropriate +`sasl.mechanism` has to be configured on source/sink: + + + +{% highlight scala %} + +// Setting on Kafka Source for Streaming Queries --- End diff -- SCRAM-SHA-256 and SCRAM-SHA-512 supported at the moment. I've added that it needs to match. This is definitely an improvement area and will file a jira soon (amongst others because found more simplifications from user perspective). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/23156 @arunmahadevan don't fully understand your comment: > Rather than controlling the queue sizes it would be better to limit the max epoch backlog and fail the query once that threshold is reached. I've written the following in the PR description: > If the related threshold reached then the query will stop with IllegalStateException. AFAIK `max epoch backlog` == `epochsWaitingToBeCommitted` which is a queue, but that's not the only unbounded part of `EpochCoordinator` (please see additional unit tests). As a result I've limited `partitionOffsets` and `partitionCommits` as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22952 @HeartSaVioR It was tested with S3 and the trick is to have HUGE amount of files. Listing files is pathologically bad as @steveloughran stated, glob is even worse. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22952 @HeartSaVioR It's a question what is not big deal, I've seen ~1 hour glob request when huge amount of files stored :) If file move is even worse one more reason to move it to separate thread. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r238995809 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the application can be configured + via Spark parameters and may not need JAAS login configuration (Spark can use Kafka's dynamic JAAS + configuration feature). For further information about delegation tokens, see + [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + + The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` + set Spark looks for authentication information in the following order and choose the first available to log in: + - **JAAS login configuration** + - **Keytab file**, such as, + +./bin/spark-submit \ +--keytab \ +--principal \ +--conf spark.kafka.bootstrap.servers= \ +... + + - **Kerberos credential cache**, such as, + +./bin/spark-submit \ +--conf spark.kafka.bootstrap.servers= \ +... + + Kafka delegation token provider can be turned off by setting `spark.security.credentials.kafka.enabled` to `false` (default: `true`). + + Spark can be configured to use the following authentication protocols to obtain token: + - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for authentication and SSL for encryption. + - **SSL**: It's leveraging a capability from SSL called 2-way authentication. The server authenticates +clients through certificates. Please note 2-way authentication must be enabled on Kafka brokers. + - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos used for authentication but +because there is no encryption it's only for testing purposes. + + After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly. + Delegation token uses `SCRAM` login module for authentication and because of that the appropriate + `sasl.mechanism` has to be configured on source/sink. --- End diff -- It means exactly that. This is missing, added and example. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r238995441 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the application can be configured + via Spark parameters and may not need JAAS login configuration (Spark can use Kafka's dynamic JAAS + configuration feature). For further information about delegation tokens, see + [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + + The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` + set Spark looks for authentication information in the following order and choose the first available to log in: + - **JAAS login configuration** + - **Keytab file**, such as, + +./bin/spark-submit \ +--keytab \ +--principal \ +--conf spark.kafka.bootstrap.servers= \ +... + + - **Kerberos credential cache**, such as, + +./bin/spark-submit \ +--conf spark.kafka.bootstrap.servers= \ +... + + Kafka delegation token provider can be turned off by setting `spark.security.credentials.kafka.enabled` to `false` (default: `true`). --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r238995312 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the application can be configured + via Spark parameters and may not need JAAS login configuration (Spark can use Kafka's dynamic JAAS + configuration feature). For further information about delegation tokens, see + [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + + The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` + set Spark looks for authentication information in the following order and choose the first available to log in: --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r238994314 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,56 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the application can be configured + via Spark parameters and may not need JAAS login configuration (Spark can use Kafka's dynamic JAAS + configuration feature). For further information about delegation tokens, see + [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + + The process is initiated by Spark's Kafka delegation token provider. This is enabled by default + but can be turned off with `spark.security.credentials.kafka.enabled`. When + `spark.kafka.bootstrap.servers` set Spark looks for authentication information in the following + order and choose the first available to log in: + - **JAAS login configuration** + - **Keytab file**, such as, + +./bin/spark-submit \ +--keytab \ +--principal \ +--conf spark.kafka.bootstrap.servers= \ +... + + - **Kerberos credential cache**, such as, + +./bin/spark-submit \ +--conf spark.kafka.bootstrap.servers= \ +... + + Spark supports the following authentication protocols to obtain token: --- End diff -- OK, fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22952 @HeartSaVioR @steveloughran As I see not only `*` and `?` missing but `[]` also. * Having glob parser in spark and supporting it I think it's too heavy and brittle. * Considering these I would solve it with warnings + caveat message in the doc (mentioning the slow globbing on object stores). As a separate offtopic just wondering how hadoop's globbing works if expander doesn't support all the glob elements. Maybe other operators (like `[]`) handled in different code part!? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22952 @HeartSaVioR I've taken a look at the possibilities: * [GlobExpander](https://github.com/apache/hadoop/blob/a55d6bba71c81c1c4e9d8cd11f55c78f10a548b0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobExpander.java#L63) is private * `globStatus` recursively is not an option because of it's poor performance * `globStatus` with limited scope can be an option but there are cases where it might take some execution time * Print warnings and not moving files is an option which seems feasible --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r238599018 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,56 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the application can be configured + via Spark parameters and may not need JAAS login configuration (Spark can use Kafka's dynamic JAAS + configuration feature). For further information about delegation tokens, see + [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + + The process is initiated by Spark's Kafka delegation token provider. This is enabled by default + but can be turned off with `spark.security.credentials.kafka.enabled`. When + `spark.kafka.bootstrap.servers` set Spark looks for authentication information in the following + order and choose the first available to log in: + - **JAAS login configuration** + - **Keytab file**, such as, + +./bin/spark-submit \ +--keytab \ +--principal \ +--conf spark.kafka.bootstrap.servers= \ +... + + - **Kerberos credential cache**, such as, + +./bin/spark-submit \ +--conf spark.kafka.bootstrap.servers= \ +... + + Spark supports the following authentication protocols to obtain token: + - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for authentication and SSL for encryption. + - **SSL**: It's leveraging a capability from SSL called 2-way authentication. The server authenticates +clients through certificates. Please note 2-way authentication must be enabled on Kafka brokers. + - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos used for authentication but +because there is no encryption it's only for testing purposes. + + After obtaining delegation token successfully, Spark spreads it across nodes and renews it accordingly. + Delegation token uses `SCRAM` login module for authentication. --- End diff -- `SCRAM` module supports only a couple `sasl.mechanism` like `SCRAM-SHA-256`, `SCRAM-SHA-512` etc. which has to be configured on the source/sink. I've updated the description to reflect this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r238596541 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,56 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the application can be configured + via Spark parameters and may not need JAAS login configuration (Spark can use Kafka's dynamic JAAS + configuration feature). For further information about delegation tokens, see + [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + + The process is initiated by Spark's Kafka delegation token provider. This is enabled by default + but can be turned off with `spark.security.credentials.kafka.enabled`. When + `spark.kafka.bootstrap.servers` set Spark looks for authentication information in the following + order and choose the first available to log in: + - **JAAS login configuration** + - **Keytab file**, such as, + +./bin/spark-submit \ +--keytab \ +--principal \ +--conf spark.kafka.bootstrap.servers= \ +... + + - **Kerberos credential cache**, such as, + +./bin/spark-submit \ +--conf spark.kafka.bootstrap.servers= \ +... + + Spark supports the following authentication protocols to obtain token: + - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for authentication and SSL for encryption. + - **SSL**: It's leveraging a capability from SSL called 2-way authentication. The server authenticates +clients through certificates. Please note 2-way authentication must be enabled on Kafka brokers. + - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos used for authentication but +because there is no encryption it's only for testing purposes. + + After obtaining delegation token successfully, Spark spreads it across nodes and renews it accordingly. --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r238596372 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,56 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the application can be configured + via Spark parameters and may not need JAAS login configuration (Spark can use Kafka's dynamic JAAS + configuration feature). For further information about delegation tokens, see + [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + + The process is initiated by Spark's Kafka delegation token provider. This is enabled by default + but can be turned off with `spark.security.credentials.kafka.enabled`. When + `spark.kafka.bootstrap.servers` set Spark looks for authentication information in the following + order and choose the first available to log in: + - **JAAS login configuration** + - **Keytab file**, such as, + +./bin/spark-submit \ +--keytab \ +--principal \ +--conf spark.kafka.bootstrap.servers= \ +... + + - **Kerberos credential cache**, such as, + +./bin/spark-submit \ +--conf spark.kafka.bootstrap.servers= \ +... + + Spark supports the following authentication protocols to obtain token: --- End diff -- > "Spark supports" Maybe `Spark can be configured to use` is better phrase. > explaining each option here is not really that helpful * I think the list must be kept (maybe without explanation) because if there is an authentication protocol in kafka it doesn't mean spark is prepared to use it. * With the explanation wanted to give a high level feeling what it's roughly does and Kafka's doc is there to take a deeper look. I'm neutral on removing them. Should we? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r238593413 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,56 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the application can be configured + via Spark parameters and may not need JAAS login configuration (Spark can use Kafka's dynamic JAAS + configuration feature). For further information about delegation tokens, see + [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + + The process is initiated by Spark's Kafka delegation token provider. This is enabled by default --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22952 @HeartSaVioR Related the glob part @zsxwing pointed out an important problem. Glob pattern is much more than checking `*` and `?`, see the link up. For simplicity take this test: ``` ... val sourcePath = "/hello/worl{d}" val archivePath = "/hello/world/spark" ... ``` This should throw `IllegalArgumentException` but proceeding without exception. A glob parser would be good to be used. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22952 @HeartSaVioR I've taken a deeper look at the overlap thing and found the following. * Added an additional test which produced odd result: ``` ... val sourcePath = "/hello/worl" val archivePath = "/hello/world/spark" ... ``` This has thrown `IllegalArgumentException` but the `sourcePath` is different than `archivePath`. This happens without any glob magic. * This approach may not work if there are symlinks involved (`fs.makeQualified` doesn't make any link resolve). * HDFS does not support it yet, though on the way, see https://issues.apache.org/jira/browse/HADOOP-10019 * S3 and ADLS does not support it All in all this part is fine now. Checking the glob part... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23195: [SPARK-26236][SS] Add kafka delegation token support doc...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/23195 @HeartSaVioR thanks for the review! cc @steveloughran maybe also interested --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r238211153 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the application can be configured + via Spark parameters and may not need JAAS login configuration (Spark can use Kafka's dynamic JAAS + configuration feature). For further information about delegation tokens, see + [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + + The process is initiated by Spark's Kafka delegation token provider. This is enabled by default + but can be turned off with `spark.security.credentials.kafka.enabled`. When + `spark.kafka.bootstrap.servers` set Spark looks for authentication information in the following + order and choose the first available to log in: + - **JAAS login configuration** + - **Keytab file**, such as, + +./bin/spark-submit \ +--keytab \ +--principal \ +--conf spark.kafka.bootstrap.servers= \ +... + + - **Kerberos credential cache**, such as, + +./bin/spark-submit \ +--conf spark.kafka.bootstrap.servers= \ +... + + Spark supports the following authentication protocols to obtain token: + - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for authentication and SSL for encryption. + - **SSL**: It's leveraging a capability from SSL called 2-way authentication. The server authenticate +clients through certificates. Please note 2-way authentication must be enabled on Kafka brokers. + - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos used for authentication but +because there is no encryption it's only for testing purposes. + + After delegation token successfully obtained Spark spreads it across nodes and renews it accordingly. --- End diff -- Yeah, it's better that way and and changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r238210009 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the application can be configured + via Spark parameters and may not need JAAS login configuration (Spark can use Kafka's dynamic JAAS + configuration feature). For further information about delegation tokens, see + [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + + The process is initiated by Spark's Kafka delegation token provider. This is enabled by default + but can be turned off with `spark.security.credentials.kafka.enabled`. When + `spark.kafka.bootstrap.servers` set Spark looks for authentication information in the following + order and choose the first available to log in: + - **JAAS login configuration** + - **Keytab file**, such as, + +./bin/spark-submit \ +--keytab \ +--principal \ +--conf spark.kafka.bootstrap.servers= \ +... + + - **Kerberos credential cache**, such as, + +./bin/spark-submit \ +--conf spark.kafka.bootstrap.servers= \ +... + + Spark supports the following authentication protocols to obtain token: + - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for authentication and SSL for encryption. + - **SSL**: It's leveraging a capability from SSL called 2-way authentication. The server authenticate +clients through certificates. Please note 2-way authentication must be enabled on Kafka brokers. + - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos used for authentication but +because there is no encryption it's only for testing purposes. + + After delegation token successfully obtained Spark spreads it across nodes and renews it accordingly. + Delegation token uses `SCRAM` login module for authentication. + + When delegation token is available for example on an executor it can be overridden with JAAS login + configuration. +- **JAAS login configuration**: JAAS login configuration must be created and transferred to all --- End diff -- Yeah, I was focusing on transferring JAAS files which made the description more technical as it should be. Nice catch and fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r238209243 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the application can be configured + via Spark parameters and may not need JAAS login configuration (Spark can use Kafka's dynamic JAAS + configuration feature). For further information about delegation tokens, see + [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + + The process is initiated by Spark's Kafka delegation token provider. This is enabled by default + but can be turned off with `spark.security.credentials.kafka.enabled`. When + `spark.kafka.bootstrap.servers` set Spark looks for authentication information in the following + order and choose the first available to log in: + - **JAAS login configuration** + - **Keytab file**, such as, + +./bin/spark-submit \ +--keytab \ +--principal \ +--conf spark.kafka.bootstrap.servers= \ +... + + - **Kerberos credential cache**, such as, + +./bin/spark-submit \ +--conf spark.kafka.bootstrap.servers= \ +... + + Spark supports the following authentication protocols to obtain token: + - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for authentication and SSL for encryption. + - **SSL**: It's leveraging a capability from SSL called 2-way authentication. The server authenticate --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23195#discussion_r238209492 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the application can be configured + via Spark parameters and may not need JAAS login configuration (Spark can use Kafka's dynamic JAAS + configuration feature). For further information about delegation tokens, see + [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + + The process is initiated by Spark's Kafka delegation token provider. This is enabled by default + but can be turned off with `spark.security.credentials.kafka.enabled`. When + `spark.kafka.bootstrap.servers` set Spark looks for authentication information in the following + order and choose the first available to log in: + - **JAAS login configuration** + - **Keytab file**, such as, + +./bin/spark-submit \ +--keytab \ +--principal \ +--conf spark.kafka.bootstrap.servers= \ +... + + - **Kerberos credential cache**, such as, + +./bin/spark-submit \ +--conf spark.kafka.bootstrap.servers= \ +... + + Spark supports the following authentication protocols to obtain token: + - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for authentication and SSL for encryption. + - **SSL**: It's leveraging a capability from SSL called 2-way authentication. The server authenticate +clients through certificates. Please note 2-way authentication must be enabled on Kafka brokers. + - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos used for authentication but +because there is no encryption it's only for testing purposes. + + After delegation token successfully obtained Spark spreads it across nodes and renews it accordingly. + Delegation token uses `SCRAM` login module for authentication. + + When delegation token is available for example on an executor it can be overridden with JAAS login --- End diff -- Yeah, removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23195: [SPARK-26236][SS] Add kafka delegation token support doc...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/23195 cc @vanzin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...
GitHub user gaborgsomogyi opened a pull request: https://github.com/apache/spark/pull/23195 [SPARK-26236][SS] Add kafka delegation token support documentation. ## What changes were proposed in this pull request? Kafka delegation token support implemented in [PR#22598](https://github.com/apache/spark/pull/22598) but that didn't contain documentation because of rapid changes. Because it has been merged in this PR I've documented it. ## How was this patch tested? jekyll build + manual html check You can merge this pull request into a Git repository by running: $ git pull https://github.com/gaborgsomogyi/spark SPARK-26236 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23195.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23195 commit e53631b8f313a2b34ec38393ea14527c3f7d8458 Author: Gabor Somogyi Date: 2018-11-30T15:29:33Z [SPARK-26236][SS] Add kafka delegation token support documentation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237662539 --- Diff: core/src/main/scala/org/apache/spark/internal/config/Kafka.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.config + +private[spark] object Kafka { + + private[spark] val BOOTSTRAP_SERVERS = --- End diff -- Removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237656411 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.UUID + +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.Token +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.security.KafkaTokenUtil +import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier +import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME} + +class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach { + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + private val tokenId = "tokenId" + UUID.randomUUID().toString + private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + override def afterEach(): Unit = { +try { + resetUGI +} finally { + super.afterEach() +} + } + + private def addTokenToUGI(): Unit = { +val token = new Token[KafkaDelegationTokenIdentifier]( + tokenId.getBytes, + tokenPassword.getBytes, + KafkaTokenUtil.TOKEN_KIND, + KafkaTokenUtil.TOKEN_SERVICE +) +val creds = new Credentials() +creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token) +UserGroupInformation.getCurrentUser.addCredentials(creds) + } + + private def resetUGI: Unit = { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237656366 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.{ util => ju } +import java.text.SimpleDateFormat + +import scala.util.control.NonFatal + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.JaasContext +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): ju.Properties = { +val adminClientProperties = new ju.Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +protocol match { + case SASL_SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) + + case SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) +setKeyStoreProperties(sparkConf, adminClientProperties) +logWarning("Obtaining kafka delegation token with SSL protocol. Please " + + "configure 2-way authentication on the broker side.") + + case SASL_PLAINTEXT.name => +logWarning("Obtaining kafka delegation token through plain communication channel. Please " + + "consider the security impact.") +} + +// There are multiple possibilities to log in and applied in the following order: --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237656297 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -688,4 +688,65 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val KAFKA_BOOTSTRAP_SERVERS = --- End diff -- Moved. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237495490 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +protocol match { + case SASL_SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) + + case SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) +setKeyStoreProperties(sparkConf, adminClientProperties) +logWarning("Obtaining kafka delegation token with SSL protocol. Please " + + "configure 2-way authentication on the broker side.") + + case SASL_PLAINTEXT.name => +logWarning("Obtaining kafka delegation token through plain communication channel. Please " + + "consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- Now the following order applies: * Global JVM configuration which is kafka specific (kafka looks for KafkaClient entry) This can be configured many ways not just `java.security.auth.login.config` but the mentioned `JaasContext.loadClientContext` handles them. * Keytab * Ticket cache I've described this in the code as well to make the intention clear. --- -
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237492537 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +protocol match { + case SASL_SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) + + case SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) +setKeyStoreProperties(sparkConf, adminClientProperties) +logWarning("Obtaining kafka delegation token with SSL protocol. Please " + + "configure 2-way authentication on the broker side.") + + case SASL_PLAINTEXT.name => +logWarning("Obtaining kafka delegation token through plain communication channel. Please " + + "consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- Added + tested the fallback mechanism. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237234310 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +protocol match { + case SASL_SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) + + case SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) +setKeyStoreProperties(sparkConf, adminClientProperties) +logWarning("Obtaining kafka delegation token with SSL protocol. Please " + + "configure 2-way authentication on the broker side.") + + case SASL_PLAINTEXT.name => +logWarning("Obtaining kafka delegation token through plain communication channel. Please " + + "consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- Kafka has it's public stuff to check for client config: `JaasContext.loadClientContext`. It throws exception when no config provided. This part works, testing the others... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/23156 cc @jose-torres @HeartSaVioR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22598: [SPARK-25501][SS] Add kafka delegation token support.
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22598 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23156: [SPARK-24063][SS] Add maximum epoch queue thresho...
GitHub user gaborgsomogyi opened a pull request: https://github.com/apache/spark/pull/23156 [SPARK-24063][SS] Add maximum epoch queue threshold for ContinuousExecution ## What changes were proposed in this pull request? Continuous processing is waiting on epochs which are not yet complete (for example one partition is not making progress) and stores pending items in queues. These queues are unbounded and can consume up all the memory easily. In this PR I've added `spark.sql.streaming.continuous.epochBacklogQueueSize` configuration possibility to make them bounded. If the related threshold reached then the query will stop with `IllegalStateException`. ## How was this patch tested? Existing + additional unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gaborgsomogyi/spark SPARK-24063 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23156.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23156 commit 72733c5068d85e70e0a65b05a593c82120277622 Author: Gabor Somogyi Date: 2018-11-22T20:45:37Z [SPARK-24063][SS] Add maximum epoch queue threshold for ContinuousExecution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23095: [SPARK-23886][SS] Update query status for ContinuousExec...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/23095 cc @tdas @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r236613589 --- Diff: pom.xml --- @@ -128,6 +128,7 @@ 1.2.1.spark2 1.2.1 +2.1.0 --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r236613501 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.UUID + +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.Token +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.security.KafkaTokenUtil +import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier +import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME} + +class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach { + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + private val tokenId = "tokenId" + UUID.randomUUID().toString + private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + private def addTokenToUGI: Unit = { +val token = new Token[KafkaDelegationTokenIdentifier]( + tokenId.getBytes, + tokenPassword.getBytes, + KafkaTokenUtil.TOKEN_KIND, + KafkaTokenUtil.TOKEN_SERVICE +) +val creds = new Credentials() +creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token) +UserGroupInformation.getCurrentUser.addCredentials(creds) + } + + private def resetUGI: Unit = { +UserGroupInformation.setLoginUser(null) + } + + test("getTokenJaasParams without token should return None") { +val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf) +assert(!jaasParams.isDefined) + } + + test("getTokenJaasParams with token no service should throw exception") { +try { + addTokenToUGI + + val thrown = intercept[IllegalArgumentException] { +KafkaSecurityHelper.getTokenJaasParams(sparkConf) + } + + assert(thrown.getMessage contains "Kerberos service name must be defined") +} finally { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r236613436 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.UUID + +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.Token +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.security.KafkaTokenUtil +import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier +import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME} + +class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach { + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + private val tokenId = "tokenId" + UUID.randomUUID().toString + private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + private def addTokenToUGI: Unit = { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r236613351 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +protocol match { + case SASL_SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) + + case SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) +setKeyStoreProperties(sparkConf, adminClientProperties) +logWarning("Obtaining kafka delegation token with SSL protocol. Please " + + "configure 2-way authentication on the broker side.") + + case SASL_PLAINTEXT.name => +logWarning("Obtaining kafka delegation token through plain communication channel. Please " + + "consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- The jira is: https://issues.apache.org/jira/browse/KAFKA-7677 The guys will take a look at it... Related `useTgtConfig` I've considered this and would make the user experience way much better. On the other hand this would close the possibility to use custom JAAS configuration (don't know how many people use this). I'm fine with that but one has to count with this consequence. Should we do this then? ---
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22952 @HeartSaVioR ok, feel free to ping me if review needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23119: [SPARK-25954][SS][FOLLOWUP][test-maven] Add Zookeeper 3....
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/23119 I've seen similar flakyness with `sbt` as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22598: [SPARK-25501][SS] Add kafka delegation token support.
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22598 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23119: [SPARK-25954][SS][FOLLOWUP][TEST-MAVEN] Add Zookeeper 3....
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/23119 Wondering why tests passed with sbt. Does sbt handles deps in a different way? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22952 @HeartSaVioR I'm fine with this, on the other hand if you're focusing on different things I'm happy to create a jira + PR for the separate thread thing to speed up processing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23095: [SPARK-23886][SS] Update query status for Continu...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23095#discussion_r235781694 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -117,6 +117,7 @@ class ContinuousExecution( // For at least once, we can just ignore those reports and risk duplicates. commitLog.getLatest() match { case Some((latestEpochId, _)) => +updateStatusMessage(s"Getting offsets from latest epoch $latestEpochId") --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r235779789 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) + + logDebug("Attempting to fetch Kafka security token.") + val token = obtainToken.invoke(null, sparkConf) +.asInstanceOf[Token[_ <: TokenIdentifier]] + creds.addToken(token.getService, token) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} + +None + } + + override def delegationTokensRequired( + sparkConf: SparkConf, + hadoopConf: Configuration): Boolean = { +sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined && + sparkConf.get(KAFKA_SECURITY_PROTOCOL).startsWith("SASL") --- End diff -- Yeah, SSL 2-way authentication was not covered though it's not that hard to add. Using the enum is definitely worth but not found until you've linked here. I've adapted `KafkaDelegationTokenProvider` and `KafkaTokenUtil` to use them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21412: [SPARK-18805][DStream] Avoid StackOverflowError while ge...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/21412 Don't know what's the issue but I'm almost certain catching `StackOverflowException` shouldn't be the solution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r235314493 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +258,64 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + if (!fs.exists(newPath.getParent)) { --- End diff -- These fs operation can also throw exception. Why not covered these as well with try? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r235312035 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala --- @@ -74,6 +76,43 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging */ val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false) + /** + * The archive directory to move completed files. The option will be only effective when + * "cleanSource" is set to "archive". + * + * Note that the completed file will be moved to this archive directory with respecting to + * its own path. + * + * For example, if the path of source file is "/a/b/dataset.txt", and the path of archive + * directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt". + */ + val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir") + + /** + * Defines how to clean up completed files. Available options are "archive", "delete", "no_op". + */ + val cleanSource: CleanSourceMode.Value = { +val modeStrOption = parameters.getOrElse("cleanSource", CleanSourceMode.NO_OP.toString) + .toUpperCase(Locale.ROOT) + +val matchedModeOpt = CleanSourceMode.values.find(_.toString == modeStrOption) +if (matchedModeOpt.isEmpty) { --- End diff -- This can be simplified something like: ``` matchedModeOpt match { case None => throw new IllegalArgumentException(s"Invalid mode for clean source option $modeStrOption." + s" Must be one of ${CleanSourceMode.values.mkString(",")}") case Some(matchedMode) => if (matchedMode == CleanSourceMode.ARCHIVE && sourceArchiveDir.isEmpty) { throw new IllegalArgumentException("Archive mode must be used with 'sourceArchiveDir' " + "option.") } matchedMode } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23095: [SPARK-23886][SS] Update query status for Continu...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23095#discussion_r235108980 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala --- @@ -28,9 +28,10 @@ import org.apache.spark.annotation.InterfaceStability * Reports information about the instantaneous status of a streaming query. * * @param message A human readable description of what the stream is currently doing. - * @param isDataAvailable True when there is new data to be processed. + * @param isDataAvailable True when there is new data to be processed. Doesn't apply + *to ContinuousExecution and always false. --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23095: [SPARK-23886][SS] Update query status for ContinuousExec...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/23095 cc @jose-torres @HeartSaVioR @attilapiros --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23095: [SPARK-23886][SS] Update query status for Continu...
GitHub user gaborgsomogyi opened a pull request: https://github.com/apache/spark/pull/23095 [SPARK-23886][SS] Update query status for ContinuousExecution ## What changes were proposed in this pull request? Added query status updates to ContinuousExecution. ## How was this patch tested? Existing unit tests + added ContinuousQueryStatusAndProgressSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gaborgsomogyi/spark SPARK-23886 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23095.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23095 commit 5b743abcaeea010e166d20f849af99f3ad5b842a Author: Gabor Somogyi Date: 2018-11-19T16:02:29Z [SPARK-23886][SS] Update query status for ContinuousExecution --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23092: [SPARK-26094][CORE][STREAMING] createNonEcFile cr...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23092#discussion_r234919472 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala --- @@ -471,7 +471,11 @@ object SparkHadoopUtil { try { // Use reflection as this uses apis only avialable in hadoop 3 val builderMethod = fs.getClass().getMethod("createFile", classOf[Path]) - val builder = builderMethod.invoke(fs, path) + // the builder api does not resolve relative paths, nor does it create parent dirs, while + // the old api does. + fs.mkdirs(path.getParent()) --- End diff -- Some error handling would be good if `mkdirs ` returns false. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r234543403 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- I've spoken with them and confirmed the same thing which is stated in the official API documentation: https://kafka.apache.org/documentation/#security_sasl_kerberos There is no such possibility. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22598: [SPARK-25501][SS] Add kafka delegation token support.
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22598 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22598: [SPARK-25501][SS] Add kafka delegation token support.
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22598 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r234043117 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- OK, speaking with them... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r234037129 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- I've just double checked my knowledge and this case kafka looks for JAAS entry but will not find it so it throws this exception: ``` Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233934316 --- Diff: core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { + private val bootStrapServers = "127.0.0.1:0" + private val plainSecurityProtocol = "SASL_PLAINTEXT" + private val sslSecurityProtocol = "SASL_SSL" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "secret" + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + test("createAdminClientProperties without bootstrap servers should throw exception") { +val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.createAdminClientProperties(sparkConf) +} +assert(thrown.getMessage contains + "Tried to obtain kafka delegation token but bootstrap servers not configured.") + } + + test("createAdminClientProperties without SSL protocol should not take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === plainSecurityProtocol) +assert(!adminClientProperties.containsKey("ssl.truststore.location")) +assert(!adminClientProperties.containsKey("ssl.truststore.password")) + } + + test("createAdminClientProperties with SSL protocol should take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStorePassword) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === sslSecurityProtocol) +assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation) +assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword) + } + + test("createAdminClientProperties without keytab should not set dynamic jaas config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233898001 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- This code doesn't try to log in but provides dynamic jaas configuration to kafka. In the original implementation only jaas was possible now I've extended it to make things easier for the user. > why is that necessary? This eliminates the need to provide file based jaas configuration. Old way: https://github.com/gaborgsomogyi/spark-structured-secure-kafka-app New way: https://github.com/gaborgsomogyi/spark-structured-secure-dt-kafka-app > won't this fail
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233888412 --- Diff: core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { + private val bootStrapServers = "127.0.0.1:0" + private val plainSecurityProtocol = "SASL_PLAINTEXT" + private val sslSecurityProtocol = "SASL_SSL" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "secret" + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + test("createAdminClientProperties without bootstrap servers should throw exception") { +val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.createAdminClientProperties(sparkConf) +} +assert(thrown.getMessage contains + "Tried to obtain kafka delegation token but bootstrap servers not configured.") + } + + test("createAdminClientProperties without SSL protocol should not take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === plainSecurityProtocol) +assert(!adminClientProperties.containsKey("ssl.truststore.location")) +assert(!adminClientProperties.containsKey("ssl.truststore.password")) + } + + test("createAdminClientProperties with SSL protocol should take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStorePassword) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === sslSecurityProtocol) +assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation) +assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword) + } + + test("createAdminClientProperties without keytab should not set dynamic jaas config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233858832 --- Diff: core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { + private val bootStrapServers = "127.0.0.1:0" + private val plainSecurityProtocol = "SASL_PLAINTEXT" + private val sslSecurityProtocol = "SASL_SSL" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "secret" + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + test("createAdminClientProperties without bootstrap servers should throw exception") { +val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.createAdminClientProperties(sparkConf) +} +assert(thrown.getMessage contains + "Tried to obtain kafka delegation token but bootstrap servers not configured.") + } + + test("createAdminClientProperties without SSL protocol should not take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === plainSecurityProtocol) +assert(!adminClientProperties.containsKey("ssl.truststore.location")) +assert(!adminClientProperties.containsKey("ssl.truststore.password")) + } + + test("createAdminClientProperties with SSL protocol should take over truststore config") { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233858781 --- Diff: core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { + private val bootStrapServers = "127.0.0.1:0" + private val plainSecurityProtocol = "SASL_PLAINTEXT" + private val sslSecurityProtocol = "SASL_SSL" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "secret" + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + test("createAdminClientProperties without bootstrap servers should throw exception") { +val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.createAdminClientProperties(sparkConf) +} +assert(thrown.getMessage contains + "Tried to obtain kafka delegation token but bootstrap servers not configured.") + } + + test("createAdminClientProperties without SSL protocol should not take over truststore config") { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233858643 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233858735 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233858687 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233858574 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.language.existentials +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + logDebug("Attempting to fetch Kafka security token.") + val (token, nextRenewalDate) = KafkaTokenUtil.obtainToken(sparkConf) + creds.addToken(token.getService, token) + return Some(nextRenewalDate) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} +None + } + + override def delegationTokensRequired( + sparkConf: SparkConf, + hadoopConf: Configuration): Boolean = { +sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined && --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233857377 --- Diff: core/pom.xml --- @@ -408,6 +408,19 @@ provided +
[GitHub] spark issue #22331: [SPARK-25331][SS] Make FileStreamSink ignore partitions ...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22331 I've taken a look at the things and I think the issue solved in the mentioned PR but not yet documented. If somebody would like to use the output directory of a spark application which uses a file sink (with exactly-once), then it must read the metadata first to get the list of valid files. Considering these this PR can be closed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r232033107 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider --- End diff -- Using full-reflection would make the code hell complicated but your other suggestion about provided scope is really good. This way the copy-paste can be avoided. Changed accordingly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r231458881 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider --- End diff -- In general if we're not moving this class into kafka area the only change which has to be done is to add another package from where the `TokenUtil` can be loaded. Otherwise `KafkaDelegationTokenProvider` has to be copied from `spark-sql-kafka...` to `spark-streaming-kafka...`. I think leaving as it is makes it more simple. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r231126695 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider --- End diff -- That will make `KafkaDelegationTokenProvider` more clean but has a couple of drawbacks: * Just move the reflection into `HadoopDelegationTokenManager` * Has to be copied when delegation token will be introduced in DStreams --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22882: [SPARK-25871][STREAMING][WIP] Don't use EC for streaming...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22882 The problem I understand my question is more like why [98204e6](https://github.com/apache/spark/commit/98204e6bcb840f1a47e1a3bd73da5fd7c9b22bcd) is not enough in the PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22882: [SPARK-25871][STREAMING][WIP] Don't use EC for streaming...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22882 I know it's WIP but just wondering why the whole patch needed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r231042582 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) + + logDebug("Attempting to fetch Kafka security token.") + val token = obtainToken.invoke(null, sparkConf) +.asInstanceOf[Token[_ <: TokenIdentifier]] + creds.addToken(token.getService, token) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} + +None + } + + override def delegationTokensRequired( --- End diff -- Are there news on this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r231041173 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider --- End diff -- Checking it... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r230683713 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager( private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++ safeCreateProvider(new HiveDelegationTokenProvider) ++ - safeCreateProvider(new HBaseDelegationTokenProvider) + safeCreateProvider(new HBaseDelegationTokenProvider) ++ + safeCreateProvider(new KafkaDelegationTokenProvider) --- End diff -- We've considered turning off by default and came to the conclusion what Marcelo described which I think still stands. The PR says documentation is not covered because design can change. My plan is to add it in a separate PR when the feature merged. Related what to document I think kafka integration guide should cover all the things. There it's already covered that the jar should be on the path. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r230680318 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) + + logDebug("Attempting to fetch Kafka security token.") + val token = obtainToken.invoke(null, sparkConf) +.asInstanceOf[Token[_ <: TokenIdentifier]] + creds.addToken(token.getService, token) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} + +None + } + + override def delegationTokensRequired( + sparkConf: SparkConf, + hadoopConf: Configuration): Boolean = { +sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined && + sparkConf.get(KAFKA_SECURITY_PROTOCOL).startsWith("SASL") --- End diff -- This condition means `if (bootstrap servers configured and (protocol == SASL_PLAINTEXT or protocol == SASL_SSL) )` Why do you think ssl not covered? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r230679147 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) + + logDebug("Attempting to fetch Kafka security token.") + val token = obtainToken.invoke(null, sparkConf) +.asInstanceOf[Token[_ <: TokenIdentifier]] + creds.addToken(token.getService, token) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} + +None --- End diff -- That's a nice catch, fixing it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22598: [SPARK-25501][SS] Add kafka delegation token support.
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22598 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r226659261 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[kafka010] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[kafka010] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +if (log.isDebugEnabled) { + val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") + logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( +"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) + val tokenInfo = token.tokenInfo + logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( +tokenInfo.tokenId, +tokenInfo.owner, +tokenInfo.renewersAsString, +dateFormat.format(tokenInfo.issueTimestamp), +dateFormat.format(tokenInfo.expiryTimestamp), +dateFormat.format(tokenInfo.maxTimestamp))) +} + } + + private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: +// - Keytab is provided -> try to log in with kerberos module using kafka's dynamic JAAS +// configuration. +// - Keytab not provided -> try to log in with JVM global security configuration +// which can be configured for example with 'java.security.auth.login.config'. +// For this no additional parameter needed. +
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r226568611 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[kafka010] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[kafka010] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +if (log.isDebugEnabled) { + val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") + logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( +"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) + val tokenInfo = token.tokenInfo + logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( +tokenInfo.tokenId, +tokenInfo.owner, +tokenInfo.renewersAsString, +dateFormat.format(tokenInfo.issueTimestamp), +dateFormat.format(tokenInfo.expiryTimestamp), +dateFormat.format(tokenInfo.maxTimestamp))) --- End diff -- The possibility is there but doesn't throw exception. Input: `dateFormat.format(-1)` Output: `1970-01-01T00:59` It will end up in invalid token which can be found out from log printouts. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r225111947 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[kafka010] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[kafka010] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +if (log.isDebugEnabled) { + val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") + logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( +"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) + val tokenInfo = token.tokenInfo + logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( +tokenInfo.tokenId, +tokenInfo.owner, +tokenInfo.renewersAsString, +dateFormat.format(tokenInfo.issueTimestamp), +dateFormat.format(tokenInfo.expiryTimestamp), +dateFormat.format(tokenInfo.maxTimestamp))) +} + } + + private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: +// - Keytab is provided -> try to log in with kerberos module using kafka's dynamic JAAS +// configuration. +// - Keytab not provided -> try to log in with JVM global security configuration +// which can be configured for example with 'java.security.auth.login.config'. +// For this no additional parameter needed. +
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r225111758 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[kafka010] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[kafka010] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +if (log.isDebugEnabled) { + val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") + logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( +"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) + val tokenInfo = token.tokenInfo + logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( +tokenInfo.tokenId, +tokenInfo.owner, +tokenInfo.renewersAsString, +dateFormat.format(tokenInfo.issueTimestamp), +dateFormat.format(tokenInfo.expiryTimestamp), +dateFormat.format(tokenInfo.maxTimestamp))) +} + } + + private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: +// - Keytab is provided -> try to log in with kerberos module using kafka's dynamic JAAS +// configuration. +// - Keytab not provided -> try to log in with JVM global security configuration +// which can be configured for example with 'java.security.auth.login.config'. +// For this no additional parameter needed. +
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r225109193 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -647,4 +647,42 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED = +ConfigBuilder("spark.kafka.delegation.token.enabled") + .doc("Set to 'true' for obtaining delegation token from kafka.") + .booleanConf + .createWithDefault(false) + + private[spark] val KAFKA_BOOTSTRAP_SERVERS = +ConfigBuilder("spark.kafka.bootstrap.servers") --- End diff -- It exists and described in the SPIP: `spark.security.credentials.kafka.enabled` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r223987644 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/TokenUtilSuite.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class TokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { + private val bootStrapServers = "127.0.0.1:0" + private val plainSecurityProtocol = "SASL_PLAINTEXT" + private val sslSecurityProtocol = "SASL_SSL" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "secret" + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + test("createAdminClientProperties without bootstrap servers should throw exception") { +val thrown = intercept[IllegalArgumentException] { + TokenUtil.createAdminClientProperties(sparkConf) +} +assert(thrown.getMessage contains + "Tried to obtain kafka delegation token but bootstrap servers not configured.") + } + + test("createAdminClientProperties without SSL protocol should not take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation) + +val adminClientProperties = TokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + .equals(bootStrapServers)) --- End diff -- No particular reason, changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r223987456 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.kafka.common.security.scram.ScramLoginModule + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object KafkaSecurityHelper extends Logging { + def getKeytabJaasParams(sparkConf: SparkConf): Option[String] = { +val keytab = sparkConf.get(KEYTAB) +if (keytab.isDefined) { + val serviceName = sparkConf.get(KAFKA_KERBEROS_SERVICE_NAME) + require(serviceName.nonEmpty, "Kerberos service name must be defined") + val principal = sparkConf.get(PRINCIPAL) + require(principal.nonEmpty, "Principal must be defined") + + val params = +s""" +|${getKrb5LoginModuleName} required +| useKeyTab=true +| serviceName="${serviceName.get}" +| keyTab="${keytab.get}" +| principal="${principal.get}"; +""".stripMargin.replace("\n", "") + logDebug(s"Krb JAAS params: $params") + Some(params) +} else { + None +} + } + + private def getKrb5LoginModuleName(): String = { --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22598: [SPARK-25501][SS] Add kafka delegation token support.
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/22598 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r223354399 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") +log.info("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( + "TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) +val tokenInfo = token.tokenInfo +log.info("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( + tokenInfo.tokenId, + tokenInfo.owner, + tokenInfo.renewersAsString, + dateFormat.format(tokenInfo.issueTimestamp), + dateFormat.format(tokenInfo.expiryTimestamp), + dateFormat.format(tokenInfo.maxTimestamp))) + } + + def obtainToken(sparkConf: SparkConf): Token[_ <: TokenIdentifier] = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + + val truststoreLocation = sparkConf.get(KAFKA_TRUSTSTORE_LOCATION) + if (truststoreLocation.nonEmpty) { +adminClientProperties.put("ssl.truststore.location", truststoreLocation.get) + } else { +logInfo("No truststore location set for SSL.") + } + + val truststorePassword = sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD) + if (truststorePassword.nonEmpty) { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r223354478 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") +log.info("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( + "TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) +val tokenInfo = token.tokenInfo +log.info("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( + tokenInfo.tokenId, + tokenInfo.owner, + tokenInfo.renewersAsString, + dateFormat.format(tokenInfo.issueTimestamp), + dateFormat.format(tokenInfo.expiryTimestamp), + dateFormat.format(tokenInfo.maxTimestamp))) + } + + def obtainToken(sparkConf: SparkConf): Token[_ <: TokenIdentifier] = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + + val truststoreLocation = sparkConf.get(KAFKA_TRUSTSTORE_LOCATION) + if (truststoreLocation.nonEmpty) { +adminClientProperties.put("ssl.truststore.location", truststoreLocation.get) + } else { +logInfo("No truststore location set for SSL.") + } + + val truststorePassword = sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD) + if (truststorePassword.nonEmpty) { +adminClientProperties.put("ssl.truststore.password", truststorePassword.get) + } else { +logInfo("No truststore password set for SSL.") + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider it's security impact.") +} + +// There are multiple possibilities to log in: +// - Keytab is provided -> try to log in with kerberos module using kafka's dynamic JAAS +// configuration. +// - Keytab not provided -> try to log in with JVM global security configuration