[GitHub] spark pull request #23274: [SPARK-26322][SS] Add spark.kafka.token.sasl.mech...

2018-12-10 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23274#discussion_r240216147
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -642,9 +642,9 @@ This way the application can be configured via Spark 
parameters and may not need
 configuration (Spark can use Kafka's dynamic JAAS configuration feature). 
For further information
 about delegation tokens, see [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
 
-The process is initiated by Spark's Kafka delegation token provider. When 
`spark.kafka.bootstrap.servers`,
+The process is initiated by Spark's Kafka delegation token provider. When 
`spark.kafka.bootstrap.servers` set,
 Spark considers the following log in options, in order of preference:
-- **JAAS login configuration**
+- **JAAS login configuration**, please see example below.
--- End diff --

Added this small pointer to make things more clear.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23274: [SPARK-26322][SS] Add spark.kafka.token.sasl.mech...

2018-12-10 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23274#discussion_r240215906
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -642,9 +642,9 @@ This way the application can be configured via Spark 
parameters and may not need
 configuration (Spark can use Kafka's dynamic JAAS configuration feature). 
For further information
 about delegation tokens, see [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
 
-The process is initiated by Spark's Kafka delegation token provider. When 
`spark.kafka.bootstrap.servers`,
+The process is initiated by Spark's Kafka delegation token provider. When 
`spark.kafka.bootstrap.servers` set,
--- End diff --

I've found this wording issue so fixed it here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23274: [SPARK-26322][SS] Add spark.kafka.token.sasl.mech...

2018-12-10 Thread gaborgsomogyi
GitHub user gaborgsomogyi opened a pull request:

https://github.com/apache/spark/pull/23274

[SPARK-26322][SS] Add spark.kafka.token.sasl.mechanism to ease delegation 
token configuration.

## What changes were proposed in this pull request?

When Kafka delegation token obtained, SCRAM `sasl.mechanism` has to be 
configured for authentication. This can be configured on the related 
source/sink which is inconvenient from user perspective. Such granularity is 
not required and this configuration can be implemented with one central 
parameter.

In this PR `spark.kafka.token.sasl.mechanism` added to configure this 
centrally (default: `SCRAM-SHA-512`).

## How was this patch tested?

Existing unit tests + on cluster.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gaborgsomogyi/spark SPARK-26322

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23274.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23274


commit 320040a90c58c87ce1b21ba6fdc16b703122e8b1
Author: Gabor Somogyi 
Date:   2018-12-07T15:02:17Z

[SPARK-26322][SS] Add spark.kafka.token.sasl.mechanism to ease delegation 
token configuration.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...

2018-12-10 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/23156
  
BTW, coming back to your clean up PR but it takes some time to switch 
context :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...

2018-12-10 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/23156
  
Ah, ok. This solution was agreed with him on 
https://github.com/apache/spark/pull/20936.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...

2018-12-10 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/23156
  
I thought this part is not affected. Who leads it? Asking it because 
haven't seen progress anywhere.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23254: [SPARK-26304][SS] Add default value to spark.kafka.sasl....

2018-12-07 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/23254
  
cc @vanzin


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23254: [SPARK-26304][SS] Add default value to spark.kafka.sasl....

2018-12-07 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/23254
  
Unrelated, filed jira: https://issues.apache.org/jira/browse/SPARK-26306


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23254: [SPARK-26304][SS] Add default value to spark.kafka.sasl....

2018-12-07 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/23254
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23254: [SPARK-26304][SS] Add default value to spark.kafk...

2018-12-07 Thread gaborgsomogyi
GitHub user gaborgsomogyi opened a pull request:

https://github.com/apache/spark/pull/23254

[SPARK-26304][SS] Add default value to 
spark.kafka.sasl.kerberos.service.name parameter

## What changes were proposed in this pull request?

spark.kafka.sasl.kerberos.service.name is an optional parameter but most of 
the time value `kafka` ha to be set. As I've written in the jira the following 
reasoning is behind:
* Kafka's configuration guide suggest the same value: 
https://kafka.apache.org/documentation/#security_sasl_kerberos_brokerconfig
* It would be easier for spark users by providing less configuration
* Other streaming engines are doing the same

In this PR I've changed the parameter from optional to `WithDefault` and 
set `kafka` as default value.

## How was this patch tested?

Available unit tests + on cluster.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gaborgsomogyi/spark SPARK-26304

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23254.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23254


commit 3f56e78e45e9c983816d8222e06d33092ecb2993
Author: Gabor Somogyi 
Date:   2018-12-06T15:37:35Z

[SPARK-26304][SS] Add default value to 
spark.kafka.sasl.kerberos.service.name parameter




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...

2018-12-06 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/23156
  
@arunmahadevan as I understand this is more like renaming the config than 
changing what the PR basically does, have I understood it well?

Having backpressure instead of stopping the query is already agreed on 
another PRs, please check them. If the backlog reaches 10k items there is no 
way back.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-06 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r239480501
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,199 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**
+- **JAAS login configuration**
+
+### Delegation token
+
+This way the application can be configured via Spark parameters and may 
not need JAAS login
+configuration (Spark can use Kafka's dynamic JAAS configuration feature). 
For further information
+about delegation tokens, see [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+The process is initiated by Spark's Kafka delegation token provider. When 
`spark.kafka.bootstrap.servers`,
+Spark considers the following log in options, in order of preference:
+- **JAAS login configuration**
+- **Keytab file**, such as,
+
+  ./bin/spark-submit \
+  --keytab  \
+  --principal  \
+  --conf spark.kafka.bootstrap.servers= \
+  ...
+
+- **Kerberos credential cache**, such as,
+
+  ./bin/spark-submit \
+  --conf spark.kafka.bootstrap.servers= \
+  ...
+
+The Kafka delegation token provider can be turned off by setting 
`spark.security.credentials.kafka.enabled` to `false` (default: `true`).
+
+Spark can be configured to use the following authentication protocols to 
obtain token (it must match with
+Kafka broker configuration):
+- **SASL SSL (default)**
+- **SSL**
+- **SASL PLAINTEXT (for testing)**
+
+After obtaining delegation token successfully, Spark distributes it across 
nodes and renews it accordingly.
+Delegation token uses `SCRAM` login module for authentication and because 
of that the appropriate
+`sasl.mechanism` has to be configured on source/sink:
+
+
+
+{% highlight scala %}
+
+// Setting on Kafka Source for Streaming Queries
--- End diff --

SCRAM-SHA-256 and SCRAM-SHA-512 supported at the moment. I've added that it 
needs to match.

This is definitely an improvement area and will file a jira soon (amongst 
others because found more simplifications from user perspective).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...

2018-12-06 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/23156
  
@arunmahadevan don't fully understand your comment:

> Rather than controlling the queue sizes it would be better to limit the 
max epoch backlog and fail the query once that threshold is reached.

I've written the following in the PR description:
> If the related threshold reached then the query will stop with 
IllegalStateException.

AFAIK `max epoch backlog` == `epochsWaitingToBeCommitted` which is a queue,
but that's not the only unbounded part of `EpochCoordinator` (please see 
additional unit tests).
As a result I've limited `partitionOffsets` and `partitionCommits` as well.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-06 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22952
  
@HeartSaVioR It was tested with S3 and the trick is to have HUGE amount of 
files. Listing files is pathologically bad as @steveloughran stated, glob is 
even worse.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22952
  
@HeartSaVioR It's a question what is not big deal, I've seen ~1 hour glob 
request when huge amount of files stored :)
If file move is even worse one more reason to move it to separate thread.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r238995809
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the 
application can be configured
+  via Spark parameters and may not need JAAS login configuration (Spark 
can use Kafka's dynamic JAAS
+  configuration feature). For further information about delegation tokens, 
see
+  [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+  The process is initiated by Spark's Kafka delegation token provider. 
When `spark.kafka.bootstrap.servers`
+  set Spark looks for authentication information in the following order 
and choose the first available to log in:
+  - **JAAS login configuration**
+  - **Keytab file**, such as,
+
+./bin/spark-submit \
+--keytab  \
+--principal  \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  - **Kerberos credential cache**, such as,
+
+./bin/spark-submit \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  Kafka delegation token provider can be turned off by setting 
`spark.security.credentials.kafka.enabled` to `false` (default: `true`).
+
+  Spark can be configured to use the following authentication protocols to 
obtain token:
+  - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for 
authentication and SSL for encryption.
+  - **SSL**: It's leveraging a capability from SSL called 2-way 
authentication. The server authenticates
+clients through certificates. Please note 2-way authentication must be 
enabled on Kafka brokers.
+  - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos 
used for authentication but
+because there is no encryption it's only for testing purposes.
+
+  After obtaining delegation token successfully, Spark distributes it 
across nodes and renews it accordingly.
+  Delegation token uses `SCRAM` login module for authentication and 
because of that the appropriate
+  `sasl.mechanism` has to be configured on source/sink.
--- End diff --

It means exactly that. This is missing, added and example.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r238995441
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the 
application can be configured
+  via Spark parameters and may not need JAAS login configuration (Spark 
can use Kafka's dynamic JAAS
+  configuration feature). For further information about delegation tokens, 
see
+  [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+  The process is initiated by Spark's Kafka delegation token provider. 
When `spark.kafka.bootstrap.servers`
+  set Spark looks for authentication information in the following order 
and choose the first available to log in:
+  - **JAAS login configuration**
+  - **Keytab file**, such as,
+
+./bin/spark-submit \
+--keytab  \
+--principal  \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  - **Kerberos credential cache**, such as,
+
+./bin/spark-submit \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  Kafka delegation token provider can be turned off by setting 
`spark.security.credentials.kafka.enabled` to `false` (default: `true`).
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r238995312
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the 
application can be configured
+  via Spark parameters and may not need JAAS login configuration (Spark 
can use Kafka's dynamic JAAS
+  configuration feature). For further information about delegation tokens, 
see
+  [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+  The process is initiated by Spark's Kafka delegation token provider. 
When `spark.kafka.bootstrap.servers`
+  set Spark looks for authentication information in the following order 
and choose the first available to log in:
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r238994314
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,56 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the 
application can be configured
+  via Spark parameters and may not need JAAS login configuration (Spark 
can use Kafka's dynamic JAAS
+  configuration feature). For further information about delegation tokens, 
see
+  [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+  The process is initiated by Spark's Kafka delegation token provider. 
This is enabled by default
+  but can be turned off with `spark.security.credentials.kafka.enabled`. 
When
+  `spark.kafka.bootstrap.servers` set Spark looks for authentication 
information in the following
+  order and choose the first available to log in:
+  - **JAAS login configuration**
+  - **Keytab file**, such as,
+
+./bin/spark-submit \
+--keytab  \
+--principal  \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  - **Kerberos credential cache**, such as,
+
+./bin/spark-submit \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  Spark supports the following authentication protocols to obtain token:
--- End diff --

OK, fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22952
  
@HeartSaVioR @steveloughran 
As I see not only `*` and `?` missing but `[]` also.

* Having glob parser in spark and supporting it I think it's too heavy and 
brittle.
* Considering these I would solve it with warnings + caveat message in the 
doc (mentioning the slow globbing on object stores).

As a separate offtopic just wondering how hadoop's globbing works if 
expander doesn't support all the glob elements. Maybe other operators (like 
`[]`) handled in different code part!?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22952
  
@HeartSaVioR 
I've taken a look at the possibilities:
* 
[GlobExpander](https://github.com/apache/hadoop/blob/a55d6bba71c81c1c4e9d8cd11f55c78f10a548b0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobExpander.java#L63)
 is private
* `globStatus` recursively is not an option because of it's poor performance
* `globStatus` with limited scope can be an option but there are cases 
where it might take some execution time
* Print warnings and not moving files is an option which seems feasible



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r238599018
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,56 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the 
application can be configured
+  via Spark parameters and may not need JAAS login configuration (Spark 
can use Kafka's dynamic JAAS
+  configuration feature). For further information about delegation tokens, 
see
+  [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+  The process is initiated by Spark's Kafka delegation token provider. 
This is enabled by default
+  but can be turned off with `spark.security.credentials.kafka.enabled`. 
When
+  `spark.kafka.bootstrap.servers` set Spark looks for authentication 
information in the following
+  order and choose the first available to log in:
+  - **JAAS login configuration**
+  - **Keytab file**, such as,
+
+./bin/spark-submit \
+--keytab  \
+--principal  \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  - **Kerberos credential cache**, such as,
+
+./bin/spark-submit \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  Spark supports the following authentication protocols to obtain token:
+  - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for 
authentication and SSL for encryption.
+  - **SSL**: It's leveraging a capability from SSL called 2-way 
authentication. The server authenticates
+clients through certificates. Please note 2-way authentication must be 
enabled on Kafka brokers.
+  - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos 
used for authentication but
+because there is no encryption it's only for testing purposes.
+
+  After obtaining delegation token successfully, Spark spreads it across 
nodes and renews it accordingly.
+  Delegation token uses `SCRAM` login module for authentication.
--- End diff --

`SCRAM` module supports only a couple `sasl.mechanism` like 
`SCRAM-SHA-256`, `SCRAM-SHA-512` etc. which has to be configured on the 
source/sink. I've updated the description to reflect this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r238596541
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,56 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the 
application can be configured
+  via Spark parameters and may not need JAAS login configuration (Spark 
can use Kafka's dynamic JAAS
+  configuration feature). For further information about delegation tokens, 
see
+  [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+  The process is initiated by Spark's Kafka delegation token provider. 
This is enabled by default
+  but can be turned off with `spark.security.credentials.kafka.enabled`. 
When
+  `spark.kafka.bootstrap.servers` set Spark looks for authentication 
information in the following
+  order and choose the first available to log in:
+  - **JAAS login configuration**
+  - **Keytab file**, such as,
+
+./bin/spark-submit \
+--keytab  \
+--principal  \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  - **Kerberos credential cache**, such as,
+
+./bin/spark-submit \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  Spark supports the following authentication protocols to obtain token:
+  - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for 
authentication and SSL for encryption.
+  - **SSL**: It's leveraging a capability from SSL called 2-way 
authentication. The server authenticates
+clients through certificates. Please note 2-way authentication must be 
enabled on Kafka brokers.
+  - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos 
used for authentication but
+because there is no encryption it's only for testing purposes.
+
+  After obtaining delegation token successfully, Spark spreads it across 
nodes and renews it accordingly.
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r238596372
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,56 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the 
application can be configured
+  via Spark parameters and may not need JAAS login configuration (Spark 
can use Kafka's dynamic JAAS
+  configuration feature). For further information about delegation tokens, 
see
+  [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+  The process is initiated by Spark's Kafka delegation token provider. 
This is enabled by default
+  but can be turned off with `spark.security.credentials.kafka.enabled`. 
When
+  `spark.kafka.bootstrap.servers` set Spark looks for authentication 
information in the following
+  order and choose the first available to log in:
+  - **JAAS login configuration**
+  - **Keytab file**, such as,
+
+./bin/spark-submit \
+--keytab  \
+--principal  \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  - **Kerberos credential cache**, such as,
+
+./bin/spark-submit \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  Spark supports the following authentication protocols to obtain token:
--- End diff --

> "Spark supports"

Maybe `Spark can be configured to use` is better phrase.

> explaining each option here is not really that helpful

* I think the list must be kept (maybe without explanation) because if 
there is an authentication protocol in kafka it doesn't mean spark is prepared 
to use it. 

* With the explanation wanted to give a high level feeling what it's 
roughly does and Kafka's doc is there to take a deeper look. I'm neutral on 
removing them. Should we?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r238593413
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,56 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the 
application can be configured
+  via Spark parameters and may not need JAAS login configuration (Spark 
can use Kafka's dynamic JAAS
+  configuration feature). For further information about delegation tokens, 
see
+  [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+  The process is initiated by Spark's Kafka delegation token provider. 
This is enabled by default
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-03 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22952
  
@HeartSaVioR 
Related the glob part @zsxwing pointed out an important problem. Glob 
pattern is much more than checking `*` and `?`, see the link up. For simplicity 
take this test:
```
...
  val sourcePath = "/hello/worl{d}"
  val archivePath = "/hello/world/spark"
...
```
This should throw `IllegalArgumentException` but proceeding without 
exception.
A glob parser would be good to be used.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-03 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22952
  
@HeartSaVioR 
I've taken a deeper look at the overlap thing and found the following.

* Added an additional test which produced odd result:
```
...
  val sourcePath = "/hello/worl"
  val archivePath = "/hello/world/spark"
...
```
This has thrown `IllegalArgumentException` but the `sourcePath` is 
different than `archivePath`. 
This happens without any glob magic.

* This approach may not work if there are symlinks involved 
(`fs.makeQualified` doesn't make any link resolve).
  * HDFS does not support it yet, though on the way, see 
https://issues.apache.org/jira/browse/HADOOP-10019
  * S3 and ADLS does not support it

  All in all this part is fine now.

Checking the glob part...



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23195: [SPARK-26236][SS] Add kafka delegation token support doc...

2018-12-03 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/23195
  
@HeartSaVioR thanks for the review!
cc @steveloughran maybe also interested



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-03 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r238211153
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the 
application can be configured
+  via Spark parameters and may not need JAAS login configuration (Spark 
can use Kafka's dynamic JAAS
+  configuration feature). For further information about delegation tokens, 
see
+  [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+  The process is initiated by Spark's Kafka delegation token provider. 
This is enabled by default
+  but can be turned off with `spark.security.credentials.kafka.enabled`. 
When
+  `spark.kafka.bootstrap.servers` set Spark looks for authentication 
information in the following
+  order and choose the first available to log in:
+  - **JAAS login configuration**
+  - **Keytab file**, such as,
+
+./bin/spark-submit \
+--keytab  \
+--principal  \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  - **Kerberos credential cache**, such as,
+
+./bin/spark-submit \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  Spark supports the following authentication protocols to obtain token:
+  - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for 
authentication and SSL for encryption.
+  - **SSL**: It's leveraging a capability from SSL called 2-way 
authentication. The server authenticate
+clients through certificates. Please note 2-way authentication must be 
enabled on Kafka brokers.
+  - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos 
used for authentication but
+because there is no encryption it's only for testing purposes.
+
+  After delegation token successfully obtained Spark spreads it across 
nodes and renews it accordingly.
--- End diff --

Yeah, it's better that way and and changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-03 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r238210009
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the 
application can be configured
+  via Spark parameters and may not need JAAS login configuration (Spark 
can use Kafka's dynamic JAAS
+  configuration feature). For further information about delegation tokens, 
see
+  [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+  The process is initiated by Spark's Kafka delegation token provider. 
This is enabled by default
+  but can be turned off with `spark.security.credentials.kafka.enabled`. 
When
+  `spark.kafka.bootstrap.servers` set Spark looks for authentication 
information in the following
+  order and choose the first available to log in:
+  - **JAAS login configuration**
+  - **Keytab file**, such as,
+
+./bin/spark-submit \
+--keytab  \
+--principal  \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  - **Kerberos credential cache**, such as,
+
+./bin/spark-submit \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  Spark supports the following authentication protocols to obtain token:
+  - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for 
authentication and SSL for encryption.
+  - **SSL**: It's leveraging a capability from SSL called 2-way 
authentication. The server authenticate
+clients through certificates. Please note 2-way authentication must be 
enabled on Kafka brokers.
+  - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos 
used for authentication but
+because there is no encryption it's only for testing purposes.
+
+  After delegation token successfully obtained Spark spreads it across 
nodes and renews it accordingly.
+  Delegation token uses `SCRAM` login module for authentication.
+
+  When delegation token is available for example on an executor it can be 
overridden with JAAS login
+  configuration.
+- **JAAS login configuration**: JAAS login configuration must be created 
and transferred to all
--- End diff --

Yeah, I was focusing on transferring JAAS files which made the description 
more technical as it should be. Nice catch and fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-03 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r238209243
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the 
application can be configured
+  via Spark parameters and may not need JAAS login configuration (Spark 
can use Kafka's dynamic JAAS
+  configuration feature). For further information about delegation tokens, 
see
+  [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+  The process is initiated by Spark's Kafka delegation token provider. 
This is enabled by default
+  but can be turned off with `spark.security.credentials.kafka.enabled`. 
When
+  `spark.kafka.bootstrap.servers` set Spark looks for authentication 
information in the following
+  order and choose the first available to log in:
+  - **JAAS login configuration**
+  - **Keytab file**, such as,
+
+./bin/spark-submit \
+--keytab  \
+--principal  \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  - **Kerberos credential cache**, such as,
+
+./bin/spark-submit \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  Spark supports the following authentication protocols to obtain token:
+  - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for 
authentication and SSL for encryption.
+  - **SSL**: It's leveraging a capability from SSL called 2-way 
authentication. The server authenticate
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-03 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r238209492
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the 
application can be configured
+  via Spark parameters and may not need JAAS login configuration (Spark 
can use Kafka's dynamic JAAS
+  configuration feature). For further information about delegation tokens, 
see
+  [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+  The process is initiated by Spark's Kafka delegation token provider. 
This is enabled by default
+  but can be turned off with `spark.security.credentials.kafka.enabled`. 
When
+  `spark.kafka.bootstrap.servers` set Spark looks for authentication 
information in the following
+  order and choose the first available to log in:
+  - **JAAS login configuration**
+  - **Keytab file**, such as,
+
+./bin/spark-submit \
+--keytab  \
+--principal  \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  - **Kerberos credential cache**, such as,
+
+./bin/spark-submit \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  Spark supports the following authentication protocols to obtain token:
+  - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for 
authentication and SSL for encryption.
+  - **SSL**: It's leveraging a capability from SSL called 2-way 
authentication. The server authenticate
+clients through certificates. Please note 2-way authentication must be 
enabled on Kafka brokers.
+  - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos 
used for authentication but
+because there is no encryption it's only for testing purposes.
+
+  After delegation token successfully obtained Spark spreads it across 
nodes and renews it accordingly.
+  Delegation token uses `SCRAM` login module for authentication.
+
+  When delegation token is available for example on an executor it can be 
overridden with JAAS login
--- End diff --

Yeah, removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23195: [SPARK-26236][SS] Add kafka delegation token support doc...

2018-12-01 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/23195
  
cc @vanzin


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-01 Thread gaborgsomogyi
GitHub user gaborgsomogyi opened a pull request:

https://github.com/apache/spark/pull/23195

[SPARK-26236][SS] Add kafka delegation token support documentation.

## What changes were proposed in this pull request?

Kafka delegation token support implemented in 
[PR#22598](https://github.com/apache/spark/pull/22598) but that didn't contain 
documentation because of rapid changes. Because it has been merged in this PR 
I've documented it.

## How was this patch tested?
jekyll build + manual html check


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gaborgsomogyi/spark SPARK-26236

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23195.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23195


commit e53631b8f313a2b34ec38393ea14527c3f7d8458
Author: Gabor Somogyi 
Date:   2018-11-30T15:29:33Z

[SPARK-26236][SS] Add kafka delegation token support documentation.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237662539
  
--- Diff: core/src/main/scala/org/apache/spark/internal/config/Kafka.scala 
---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+private[spark] object Kafka {
+
+  private[spark] val BOOTSTRAP_SERVERS =
--- End diff --

Removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237656411
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.UUID
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.security.KafkaTokenUtil
+import 
org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
+import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME}
+
+class KafkaSecurityHelperSuite extends SparkFunSuite with 
BeforeAndAfterEach {
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+  private val tokenId = "tokenId" + UUID.randomUUID().toString
+  private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  override def afterEach(): Unit = {
+try {
+  resetUGI
+} finally {
+  super.afterEach()
+}
+  }
+
+  private def addTokenToUGI(): Unit = {
+val token = new Token[KafkaDelegationTokenIdentifier](
+  tokenId.getBytes,
+  tokenPassword.getBytes,
+  KafkaTokenUtil.TOKEN_KIND,
+  KafkaTokenUtil.TOKEN_SERVICE
+)
+val creds = new Credentials()
+creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token)
+UserGroupInformation.getCurrentUser.addCredentials(creds)
+  }
+
+  private def resetUGI: Unit = {
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237656366
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.{ util => ju }
+import java.text.SimpleDateFormat
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.JaasContext
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
ju.Properties = {
+val adminClientProperties = new ju.Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in and applied in the 
following order:
--- End diff --

Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237656297
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -688,4 +688,65 @@ package object config {
   .stringConf
   .toSequence
   .createWithDefault(Nil)
+
+  private[spark] val KAFKA_BOOTSTRAP_SERVERS =
--- End diff --

Moved.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237495490
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

Now the following order applies:
* Global JVM configuration which is kafka specific (kafka looks for 
KafkaClient entry)
  This can be configured many ways not just 
`java.security.auth.login.config` but the mentioned 
`JaasContext.loadClientContext` handles them.
* Keytab
* Ticket cache
I've described this in the code as well to make the intention clear.



---

-

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237492537
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

Added + tested the fallback mechanism.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-28 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237234310
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

Kafka has it's public stuff to check for client config: 
`JaasContext.loadClientContext`. It throws exception when no config provided. 
This part works, testing the others...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...

2018-11-27 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/23156
  
cc @jose-torres @HeartSaVioR 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22598: [SPARK-25501][SS] Add kafka delegation token support.

2018-11-27 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22598
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23156: [SPARK-24063][SS] Add maximum epoch queue thresho...

2018-11-27 Thread gaborgsomogyi
GitHub user gaborgsomogyi opened a pull request:

https://github.com/apache/spark/pull/23156

[SPARK-24063][SS] Add maximum epoch queue threshold for ContinuousExecution

## What changes were proposed in this pull request?

Continuous processing is waiting on epochs which are not yet complete (for 
example one partition is not making progress) and stores pending items in 
queues. These queues are unbounded and can consume up all the memory easily. In 
this PR I've added `spark.sql.streaming.continuous.epochBacklogQueueSize` 
configuration possibility to make them bounded. If the related threshold 
reached then the query will stop with `IllegalStateException`.

## How was this patch tested?

Existing + additional unit tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gaborgsomogyi/spark SPARK-24063

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23156.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23156


commit 72733c5068d85e70e0a65b05a593c82120277622
Author: Gabor Somogyi 
Date:   2018-11-22T20:45:37Z

[SPARK-24063][SS] Add maximum epoch queue threshold for ContinuousExecution.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23095: [SPARK-23886][SS] Update query status for ContinuousExec...

2018-11-27 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/23095
  
cc @tdas @zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-27 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r236613589
  
--- Diff: pom.xml ---
@@ -128,6 +128,7 @@
 1.2.1.spark2
 
 1.2.1
+2.1.0
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-27 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r236613501
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.UUID
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.security.KafkaTokenUtil
+import 
org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
+import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME}
+
+class KafkaSecurityHelperSuite extends SparkFunSuite with 
BeforeAndAfterEach {
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+  private val tokenId = "tokenId" + UUID.randomUUID().toString
+  private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  private def addTokenToUGI: Unit = {
+val token = new Token[KafkaDelegationTokenIdentifier](
+  tokenId.getBytes,
+  tokenPassword.getBytes,
+  KafkaTokenUtil.TOKEN_KIND,
+  KafkaTokenUtil.TOKEN_SERVICE
+)
+val creds = new Credentials()
+creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token)
+UserGroupInformation.getCurrentUser.addCredentials(creds)
+  }
+
+  private def resetUGI: Unit = {
+UserGroupInformation.setLoginUser(null)
+  }
+
+  test("getTokenJaasParams without token should return None") {
+val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf)
+assert(!jaasParams.isDefined)
+  }
+
+  test("getTokenJaasParams with token no service should throw exception") {
+try {
+  addTokenToUGI
+
+  val thrown = intercept[IllegalArgumentException] {
+KafkaSecurityHelper.getTokenJaasParams(sparkConf)
+  }
+
+  assert(thrown.getMessage contains "Kerberos service name must be 
defined")
+} finally {
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-27 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r236613436
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.UUID
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.security.KafkaTokenUtil
+import 
org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
+import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME}
+
+class KafkaSecurityHelperSuite extends SparkFunSuite with 
BeforeAndAfterEach {
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+  private val tokenId = "tokenId" + UUID.randomUUID().toString
+  private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  private def addTokenToUGI: Unit = {
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-27 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r236613351
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

The jira is: https://issues.apache.org/jira/browse/KAFKA-7677
The guys will take a look at it...

Related `useTgtConfig` I've considered this and would make the user 
experience way much better. On the other hand this would close the possibility 
to use custom JAAS configuration (don't know how many people use this). I'm 
fine with that but one has to count with this consequence. Should we do this 
then?


---

[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-11-22 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22952
  
@HeartSaVioR ok, feel free to ping me if review needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23119: [SPARK-25954][SS][FOLLOWUP][test-maven] Add Zookeeper 3....

2018-11-22 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/23119
  
I've seen similar flakyness with `sbt` as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22598: [SPARK-25501][SS] Add kafka delegation token support.

2018-11-22 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22598
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23119: [SPARK-25954][SS][FOLLOWUP][TEST-MAVEN] Add Zookeeper 3....

2018-11-22 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/23119
  
Wondering why tests passed with sbt. Does sbt handles deps in a different 
way?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-11-22 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22952
  
@HeartSaVioR I'm fine with this, on the other hand if you're focusing on 
different things I'm happy to create a jira + PR for the separate thread thing 
to speed up processing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23095: [SPARK-23886][SS] Update query status for Continu...

2018-11-22 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23095#discussion_r235781694
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -117,6 +117,7 @@ class ContinuousExecution(
 // For at least once, we can just ignore those reports and risk 
duplicates.
 commitLog.getLatest() match {
   case Some((latestEpochId, _)) =>
+updateStatusMessage(s"Getting offsets from latest epoch 
$latestEpochId")
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-22 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r235779789
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
+  }
+
+  override def delegationTokensRequired(
+  sparkConf: SparkConf,
+  hadoopConf: Configuration): Boolean = {
+sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined &&
+  sparkConf.get(KAFKA_SECURITY_PROTOCOL).startsWith("SASL")
--- End diff --

Yeah, SSL 2-way authentication was not covered though it's not that hard to 
add.
Using the enum is definitely worth but not found until you've linked here. 
I've adapted `KafkaDelegationTokenProvider` and `KafkaTokenUtil` to use them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21412: [SPARK-18805][DStream] Avoid StackOverflowError while ge...

2018-11-21 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/21412
  
Don't know what's the issue but I'm almost certain catching 
`StackOverflowException` shouldn't be the solution.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-21 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r235314493
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +258,64 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  if (!fs.exists(newPath.getParent)) {
--- End diff --

These fs operation can also throw exception. Why not covered these as well 
with try?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-21 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r235312035
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
 ---
@@ -74,6 +76,43 @@ class FileStreamOptions(parameters: 
CaseInsensitiveMap[String]) extends Logging
*/
   val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
 
+  /**
+   * The archive directory to move completed files. The option will be 
only effective when
+   * "cleanSource" is set to "archive".
+   *
+   * Note that the completed file will be moved to this archive directory 
with respecting to
+   * its own path.
+   *
+   * For example, if the path of source file is "/a/b/dataset.txt", and 
the path of archive
+   * directory is "/archived/here", file will be moved to 
"/archived/here/a/b/dataset.txt".
+   */
+  val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir")
+
+  /**
+   * Defines how to clean up completed files. Available options are 
"archive", "delete", "no_op".
+   */
+  val cleanSource: CleanSourceMode.Value = {
+val modeStrOption = parameters.getOrElse("cleanSource", 
CleanSourceMode.NO_OP.toString)
+  .toUpperCase(Locale.ROOT)
+
+val matchedModeOpt = CleanSourceMode.values.find(_.toString == 
modeStrOption)
+if (matchedModeOpt.isEmpty) {
--- End diff --

This can be simplified something like:
```
matchedModeOpt match {
  case None =>
throw new IllegalArgumentException(s"Invalid mode for clean source 
option $modeStrOption." +
  s" Must be one of ${CleanSourceMode.values.mkString(",")}")
  case Some(matchedMode) =>
if (matchedMode == CleanSourceMode.ARCHIVE && 
sourceArchiveDir.isEmpty) {
  throw new IllegalArgumentException("Archive mode must be used 
with 'sourceArchiveDir' " +
"option.")
}
matchedMode
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23095: [SPARK-23886][SS] Update query status for Continu...

2018-11-20 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23095#discussion_r235108980
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
 ---
@@ -28,9 +28,10 @@ import org.apache.spark.annotation.InterfaceStability
  * Reports information about the instantaneous status of a streaming query.
  *
  * @param message A human readable description of what the stream is 
currently doing.
- * @param isDataAvailable True when there is new data to be processed.
+ * @param isDataAvailable True when there is new data to be processed. 
Doesn't apply
+ *to ContinuousExecution and always false.
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23095: [SPARK-23886][SS] Update query status for ContinuousExec...

2018-11-20 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/23095
  
cc @jose-torres @HeartSaVioR @attilapiros 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23095: [SPARK-23886][SS] Update query status for Continu...

2018-11-20 Thread gaborgsomogyi
GitHub user gaborgsomogyi opened a pull request:

https://github.com/apache/spark/pull/23095

[SPARK-23886][SS] Update query status for ContinuousExecution

## What changes were proposed in this pull request?

Added query status updates to ContinuousExecution.

## How was this patch tested?

Existing unit tests + added ContinuousQueryStatusAndProgressSuite.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gaborgsomogyi/spark SPARK-23886

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23095.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23095


commit 5b743abcaeea010e166d20f849af99f3ad5b842a
Author: Gabor Somogyi 
Date:   2018-11-19T16:02:29Z

[SPARK-23886][SS] Update query status for ContinuousExecution




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23092: [SPARK-26094][CORE][STREAMING] createNonEcFile cr...

2018-11-20 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23092#discussion_r234919472
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -471,7 +471,11 @@ object SparkHadoopUtil {
 try {
   // Use reflection as this uses apis only avialable in hadoop 3
   val builderMethod = fs.getClass().getMethod("createFile", 
classOf[Path])
-  val builder = builderMethod.invoke(fs, path)
+  // the builder api does not resolve relative paths, nor does it 
create parent dirs, while
+  // the old api does.
+  fs.mkdirs(path.getParent())
--- End diff --

Some error handling would be good if `mkdirs ` returns false.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r234543403
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

I've spoken with them and confirmed the same thing which is stated in the 
official API documentation: 
https://kafka.apache.org/documentation/#security_sasl_kerberos
There is no such possibility.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22598: [SPARK-25501][SS] Add kafka delegation token support.

2018-11-16 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22598
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22598: [SPARK-25501][SS] Add kafka delegation token support.

2018-11-16 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22598
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r234043117
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

OK, speaking with them...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r234037129
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

I've just double checked my knowledge and this case kafka looks for JAAS 
entry but will not find it so it throws this exception:
```
Caused by: java.lang.IllegalArgumentException: Could not find a 
'KafkaClient' entry in the JAAS configuration. System property 
'java.security.auth.login.config' is not set
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233934316
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala 
---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val plainSecurityProtocol = "SASL_PLAINTEXT"
+  private val sslSecurityProtocol = "SASL_SSL"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "secret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+val thrown = intercept[IllegalArgumentException] {
+  KafkaTokenUtil.createAdminClientProperties(sparkConf)
+}
+assert(thrown.getMessage contains
+  "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties without SSL protocol should not take 
over truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === plainSecurityProtocol)
+assert(!adminClientProperties.containsKey("ssl.truststore.location"))
+assert(!adminClientProperties.containsKey("ssl.truststore.password"))
+  }
+
+  test("createAdminClientProperties with SSL protocol should take over 
truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStorePassword)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === sslSecurityProtocol)
+assert(adminClientProperties.get("ssl.truststore.location") === 
trustStoreLocation)
+assert(adminClientProperties.get("ssl.truststore.password") === 
trustStorePassword)
+  }
+
+  test("createAdminClientProperties without keytab should not set dynamic 
jaas config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233898001
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

This code doesn't try to log in but provides dynamic jaas configuration to 
kafka. In the original implementation only jaas was possible now I've extended 
it to make things easier for the user.

> why is that necessary?

This eliminates the need to provide file based jaas configuration.
Old way: https://github.com/gaborgsomogyi/spark-structured-secure-kafka-app
New way: 
https://github.com/gaborgsomogyi/spark-structured-secure-dt-kafka-app

> won't this fail

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233888412
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala 
---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val plainSecurityProtocol = "SASL_PLAINTEXT"
+  private val sslSecurityProtocol = "SASL_SSL"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "secret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+val thrown = intercept[IllegalArgumentException] {
+  KafkaTokenUtil.createAdminClientProperties(sparkConf)
+}
+assert(thrown.getMessage contains
+  "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties without SSL protocol should not take 
over truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === plainSecurityProtocol)
+assert(!adminClientProperties.containsKey("ssl.truststore.location"))
+assert(!adminClientProperties.containsKey("ssl.truststore.password"))
+  }
+
+  test("createAdminClientProperties with SSL protocol should take over 
truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStorePassword)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === sslSecurityProtocol)
+assert(adminClientProperties.get("ssl.truststore.location") === 
trustStoreLocation)
+assert(adminClientProperties.get("ssl.truststore.password") === 
trustStorePassword)
+  }
+
+  test("createAdminClientProperties without keytab should not set dynamic 
jaas config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233858832
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala 
---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val plainSecurityProtocol = "SASL_PLAINTEXT"
+  private val sslSecurityProtocol = "SASL_SSL"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "secret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+val thrown = intercept[IllegalArgumentException] {
+  KafkaTokenUtil.createAdminClientProperties(sparkConf)
+}
+assert(thrown.getMessage contains
+  "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties without SSL protocol should not take 
over truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === plainSecurityProtocol)
+assert(!adminClientProperties.containsKey("ssl.truststore.location"))
+assert(!adminClientProperties.containsKey("ssl.truststore.password"))
+  }
+
+  test("createAdminClientProperties with SSL protocol should take over 
truststore config") {
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233858781
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala 
---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val plainSecurityProtocol = "SASL_PLAINTEXT"
+  private val sslSecurityProtocol = "SASL_SSL"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "secret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+val thrown = intercept[IllegalArgumentException] {
+  KafkaTokenUtil.createAdminClientProperties(sparkConf)
+}
+assert(thrown.getMessage contains
+  "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties without SSL protocol should not take 
over truststore config") {
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233858643
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233858735
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233858687
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233858574
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.language.existentials
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  logDebug("Attempting to fetch Kafka security token.")
+  val (token, nextRenewalDate) = KafkaTokenUtil.obtainToken(sparkConf)
+  creds.addToken(token.getService, token)
+  return Some(nextRenewalDate)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+None
+  }
+
+  override def delegationTokensRequired(
+  sparkConf: SparkConf,
+  hadoopConf: Configuration): Boolean = {
+sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined &&
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233857377
  
--- Diff: core/pom.xml ---
@@ -408,6 +408,19 @@
   provided
 
 
+

[GitHub] spark issue #22331: [SPARK-25331][SS] Make FileStreamSink ignore partitions ...

2018-11-14 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22331
  
I've taken a look at the things and I think the issue solved in the 
mentioned PR but not yet documented. If somebody would like to use the output 
directory of a spark application which uses a file sink (with exactly-once), 
then it must read the metadata first to get the list of valid files.

Considering these this PR can be closed.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r232033107
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
--- End diff --

Using full-reflection would make the code hell complicated but your other 
suggestion about provided scope is really good. This way the copy-paste can be 
avoided. Changed accordingly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-07 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r231458881
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
--- End diff --

In general if we're not moving this class into kafka area the only change 
which has to be done is to add another package from where the `TokenUtil` can 
be loaded. Otherwise `KafkaDelegationTokenProvider` has to be copied from 
`spark-sql-kafka...` to `spark-streaming-kafka...`. I think leaving as it is 
makes it more simple.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-06 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r231126695
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
--- End diff --

That will make `KafkaDelegationTokenProvider` more clean but has a couple 
of drawbacks:
 * Just move the reflection into `HadoopDelegationTokenManager`
 * Has to be copied when delegation token will be introduced in DStreams



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22882: [SPARK-25871][STREAMING][WIP] Don't use EC for streaming...

2018-11-06 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22882
  
The problem I understand my question is more like why 
[98204e6](https://github.com/apache/spark/commit/98204e6bcb840f1a47e1a3bd73da5fd7c9b22bcd)
 is not enough in the PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22882: [SPARK-25871][STREAMING][WIP] Don't use EC for streaming...

2018-11-06 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22882
  
I know it's WIP but just wondering why the whole patch needed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-06 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r231042582
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
+  }
+
+  override def delegationTokensRequired(
--- End diff --

Are there news on this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-06 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r231041173
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
--- End diff --

Checking it...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r230683713
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
@@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager(
   private def getDelegationTokenProviders: Map[String, 
HadoopDelegationTokenProvider] = {
 val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) 
++
   safeCreateProvider(new HiveDelegationTokenProvider) ++
-  safeCreateProvider(new HBaseDelegationTokenProvider)
+  safeCreateProvider(new HBaseDelegationTokenProvider) ++
+  safeCreateProvider(new KafkaDelegationTokenProvider)
--- End diff --

We've considered turning off by default and came to the conclusion what 
Marcelo described which I think still stands.

The PR says documentation is not covered because design can change. My plan 
is to add it in a separate PR when the feature merged.

Related what to document I think kafka integration guide should cover all 
the things. There it's already covered that the jar should be on the path.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r230680318
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
+  }
+
+  override def delegationTokensRequired(
+  sparkConf: SparkConf,
+  hadoopConf: Configuration): Boolean = {
+sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined &&
+  sparkConf.get(KAFKA_SECURITY_PROTOCOL).startsWith("SASL")
--- End diff --

This condition means
`if (bootstrap servers configured and (protocol == SASL_PLAINTEXT or 
protocol == SASL_SSL) )`
Why do you think ssl not covered?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r230679147
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
--- End diff --

That's a nice catch, fixing it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22598: [SPARK-25501][SS] Add kafka delegation token support.

2018-10-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22598
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r226659261
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[kafka010] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[kafka010] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+if (log.isDebugEnabled) {
+  val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+  logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+  val tokenInfo = token.tokenInfo
+  logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+tokenInfo.tokenId,
+tokenInfo.owner,
+tokenInfo.renewersAsString,
+dateFormat.format(tokenInfo.issueTimestamp),
+dateFormat.format(tokenInfo.expiryTimestamp),
+dateFormat.format(tokenInfo.maxTimestamp)))
+}
+  }
+
+  private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
+// - Keytab is provided -> try to log in with kerberos module using 
kafka's dynamic JAAS
+//   configuration.
+// - Keytab not provided -> try to log in with JVM global security 
configuration
+//   which can be configured for example with 
'java.security.auth.login.config'.
+//   For this no additional parameter needed.
+ 

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r226568611
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[kafka010] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[kafka010] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+if (log.isDebugEnabled) {
+  val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+  logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+  val tokenInfo = token.tokenInfo
+  logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+tokenInfo.tokenId,
+tokenInfo.owner,
+tokenInfo.renewersAsString,
+dateFormat.format(tokenInfo.issueTimestamp),
+dateFormat.format(tokenInfo.expiryTimestamp),
+dateFormat.format(tokenInfo.maxTimestamp)))
--- End diff --

The possibility is there but doesn't throw exception.
Input: `dateFormat.format(-1)`
Output: `1970-01-01T00:59`
It will end up in invalid token which can be found out from log printouts.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r225111947
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[kafka010] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[kafka010] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+if (log.isDebugEnabled) {
+  val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+  logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+  val tokenInfo = token.tokenInfo
+  logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+tokenInfo.tokenId,
+tokenInfo.owner,
+tokenInfo.renewersAsString,
+dateFormat.format(tokenInfo.issueTimestamp),
+dateFormat.format(tokenInfo.expiryTimestamp),
+dateFormat.format(tokenInfo.maxTimestamp)))
+}
+  }
+
+  private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
+// - Keytab is provided -> try to log in with kerberos module using 
kafka's dynamic JAAS
+//   configuration.
+// - Keytab not provided -> try to log in with JVM global security 
configuration
+//   which can be configured for example with 
'java.security.auth.login.config'.
+//   For this no additional parameter needed.
+ 

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r225111758
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[kafka010] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[kafka010] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+if (log.isDebugEnabled) {
+  val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+  logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+  val tokenInfo = token.tokenInfo
+  logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+tokenInfo.tokenId,
+tokenInfo.owner,
+tokenInfo.renewersAsString,
+dateFormat.format(tokenInfo.issueTimestamp),
+dateFormat.format(tokenInfo.expiryTimestamp),
+dateFormat.format(tokenInfo.maxTimestamp)))
+}
+  }
+
+  private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
+// - Keytab is provided -> try to log in with kerberos module using 
kafka's dynamic JAAS
+//   configuration.
+// - Keytab not provided -> try to log in with JVM global security 
configuration
+//   which can be configured for example with 
'java.security.auth.login.config'.
+//   For this no additional parameter needed.
+ 

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r225109193
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -647,4 +647,42 @@ package object config {
   .stringConf
   .toSequence
   .createWithDefault(Nil)
+
+  private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED =
+ConfigBuilder("spark.kafka.delegation.token.enabled")
+  .doc("Set to 'true' for obtaining delegation token from kafka.")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val KAFKA_BOOTSTRAP_SERVERS =
+ConfigBuilder("spark.kafka.bootstrap.servers")
--- End diff --

It exists and described in the SPIP: 
`spark.security.credentials.kafka.enabled`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-10 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223987644
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/TokenUtilSuite.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class TokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val plainSecurityProtocol = "SASL_PLAINTEXT"
+  private val sslSecurityProtocol = "SASL_SSL"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "secret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+val thrown = intercept[IllegalArgumentException] {
+  TokenUtil.createAdminClientProperties(sparkConf)
+}
+assert(thrown.getMessage contains
+  "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties without SSL protocol should not take 
over truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation)
+
+val adminClientProperties = 
TokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  .equals(bootStrapServers))
--- End diff --

No particular reason, changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-10 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223987456
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.apache.kafka.common.security.scram.ScramLoginModule
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object KafkaSecurityHelper extends Logging {
+  def getKeytabJaasParams(sparkConf: SparkConf): Option[String] = {
+val keytab = sparkConf.get(KEYTAB)
+if (keytab.isDefined) {
+  val serviceName = sparkConf.get(KAFKA_KERBEROS_SERVICE_NAME)
+  require(serviceName.nonEmpty, "Kerberos service name must be 
defined")
+  val principal = sparkConf.get(PRINCIPAL)
+  require(principal.nonEmpty, "Principal must be defined")
+
+  val params =
+s"""
+|${getKrb5LoginModuleName} required
+| useKeyTab=true
+| serviceName="${serviceName.get}"
+| keyTab="${keytab.get}"
+| principal="${principal.get}";
+""".stripMargin.replace("\n", "")
+  logDebug(s"Krb JAAS params: $params")
+  Some(params)
+} else {
+  None
+}
+  }
+
+  private def getKrb5LoginModuleName(): String = {
--- End diff --

Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22598: [SPARK-25501][SS] Add kafka delegation token support.

2018-10-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/22598
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223354399
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+log.info("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+  "TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+val tokenInfo = token.tokenInfo
+log.info("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+  tokenInfo.tokenId,
+  tokenInfo.owner,
+  tokenInfo.renewersAsString,
+  dateFormat.format(tokenInfo.issueTimestamp),
+  dateFormat.format(tokenInfo.expiryTimestamp),
+  dateFormat.format(tokenInfo.maxTimestamp)))
+  }
+
+  def obtainToken(sparkConf: SparkConf): Token[_ <: TokenIdentifier] = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+
+  val truststoreLocation = sparkConf.get(KAFKA_TRUSTSTORE_LOCATION)
+  if (truststoreLocation.nonEmpty) {
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation.get)
+  } else {
+logInfo("No truststore location set for SSL.")
+  }
+
+  val truststorePassword = sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD)
+  if (truststorePassword.nonEmpty) {
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223354478
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+log.info("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+  "TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+val tokenInfo = token.tokenInfo
+log.info("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+  tokenInfo.tokenId,
+  tokenInfo.owner,
+  tokenInfo.renewersAsString,
+  dateFormat.format(tokenInfo.issueTimestamp),
+  dateFormat.format(tokenInfo.expiryTimestamp),
+  dateFormat.format(tokenInfo.maxTimestamp)))
+  }
+
+  def obtainToken(sparkConf: SparkConf): Token[_ <: TokenIdentifier] = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+
+  val truststoreLocation = sparkConf.get(KAFKA_TRUSTSTORE_LOCATION)
+  if (truststoreLocation.nonEmpty) {
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation.get)
+  } else {
+logInfo("No truststore location set for SSL.")
+  }
+
+  val truststorePassword = sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD)
+  if (truststorePassword.nonEmpty) {
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword.get)
+  } else {
+logInfo("No truststore password set for SSL.")
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider it's security impact.")
+}
+
+// There are multiple possibilities to log in:
+// - Keytab is provided -> try to log in with kerberos module using 
kafka's dynamic JAAS
+//   configuration.
+// - Keytab not provided -> try to log in with JVM global security 
configuration

  1   2   3   4   5   >