[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...

2018-12-10 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/23156
  
@gaborgsomogyi No problem :) When you get some other times please take a 
look at my other PRs as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...

2018-12-10 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/23156
  
I think @jose-torres previously led the feature.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...

2018-12-10 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/23156
  
I'd rather not jumping in something regarding continuous mode unless the 
overall design (including aggregation and join) of continuous mode is cleared 
and stabilized.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23260: [SPARK-26311][YARN] New feature: custom log URL for stdo...

2018-12-09 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/23260
  
@srowen 
For now executor log url is **static** in Spark, which forces Node Manager 
to be alive even after application is finished, in order to provide executor 
log in SHS.

This situation can be happen when decommission happens a bit frequently, or 
when end users want a kind of elasticity against YARN cluster (not only 
decommissioning nodes, but also elasticity on YARN cluster itself - YARN has 
cluster id for RM which classifies the cluster which can be leveraged when 
dealing with multiple YARN clusters.)

There's also similar change applied on Hadoop side.

https://github.com/apache/hadoop/commit/5fe1dbf1959976d0dc5a8e614dd74836cfbee04c

We are experimenting central log service which resolves above situation. At 
least the log url for centralized log service can't be same URL as NM webapp, 
we have to get flexibility of executor log URL.

Hope it explains the rationalization well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23169: [SPARK-26103][SQL] Limit the length of debug strings for...

2018-12-08 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/23169
  
Thanks for addressong review comments. It looks great overall.

We may want to document the new config so that we can guide setting the 
value to lower when end users suffer from memory pressure due to long physical 
plans in UI pages.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-08 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23260: [SPARK-26311][YARN] New feature: custom log URL f...

2018-12-07 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/23260

[SPARK-26311][YARN] New feature: custom log URL for stdout/stderr

## What changes were proposed in this pull request?

This patch proposes adding a new configuration on YARN mode: custom log 
URL. This will enable end users to point application logs to external log 
service which enables to serve logs when NodeManager becomes unavailable.

Some pre-defined patterns are available for custom log URL to specify them 
like path variables.

## How was this patch tested?

Manual test. 

Below run changes executor log URLs in UI pages.

```
./bin/spark-submit --conf 
spark.yarn.custom.log.url="{{HttpScheme}}{{NodeHttpAddress}}/test/cluster/{{ClusterId}}/container/{{ContainerId}}/user/{{User}}/filename/{{FileName}}"
 --class org.apache.spark.examples.SparkPi 
examples/jars/spark-examples_2.11-.jar
```

Example of stdout log url is below:


`http://node-address:node-port`/test/cluster/`workload1`/container/`container_e08_1542798098040_0012_01_02`/user/`spark`/filename/`stdout`


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-26311

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23260.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23260


commit 65cc6a32729cccba340f66c766c7255be4d7f356
Author: Jungtaek Lim (HeartSaVioR) 
Date:   2018-12-08T06:32:46Z

[SPARK-26311][YARN] New feature: custom log URL for stdout/stderr




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
@zsxwing Please also take a look: I guess I addressed glob overlap issue as 
well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
@gaborgsomogyi @steveloughran 
Please take a look at 17b9b9a043ead0d448048c88b30f544228bd230b which just 
leverages GlobFilter. You may find that when the depth of archive path is more 
than 2, there's no chance for final destination to be picked up from 
FileStreamSource: so most of usual cases overlap will not happen, as well as 
Spark can determine this as only comparing depths.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-06 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
I'm now also playing with Hadoop glob relevant classes to check whether 
final destination matches source path glob pattern or not.

* Looks like we can leverage `GlobPattern` but it is marked as `@Private`.
* `GlobFilter` is `@Public` but it only checks against `path.getName()` so 
it would only compare with the last component. If we would like to leverage 
this, we should split all components and compare with multiple filters.

Will update the code and test once I find a viable approach.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
@gaborgsomogyi 
That's really huge... Could you share how you tested? Like which FS 
(local/HDFS/S3/etc), directory structure, count of files... That would help me 
understanding the impact and also help on testing manually when we deal with 
optimization.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
@gaborgsomogyi @steveloughran 
OK. I'll change the approach to just check against final path for each 
moving. As @steveloughran stated, it may bring performance hit for each 
checking when dealing with object stores, so we may also need to provide a way 
to disable checking as well with caution. (Btw, if moving file in object store 
requires huge overhead rather than globing, slow globing may not be a big deal.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-04 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
@gaborgsomogyi @steveloughran 
`GlobExpander` only looks like handling `{}` pattern. We need to still deal 
with `*` and `?` which can't be expanded like this. 

It would only work if we would be OK with restricting descendants of 
multiple paths (for now we restrict descendants of one path), so while it would 
help fixing the bug of current patch, it might be still too restrictive.

I think changing Hadoop version because of this costs too much. If we 
really would like to go, only viable solution is copying the code. (Actually we 
can also just reimplement it since its requirements are like a kind of 
assignment, though we may end up with similar code.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23169: [SPARK-26103][SQL] Limit the length of debug strings for...

2018-12-03 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/23169
  
@DaveDeCaprio 

You might miss to roll back change in test.

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99632/testReport/org.apache.spark.sql.catalyst.trees/TreeNodeSuite/treeString_limits_plan_length/

I also think you need to add a new test with setting configuration to some 
value and see whether it works properly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23169: [SPARK-26103][SQL] Limit the length of debug strings for...

2018-12-03 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/23169
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-03 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
@zsxwing @gaborgsomogyi 
What we were trying to do is enforcing archive path so that moved files 
will not make overlap with source path. There may be same file name with 
different directory so I'm also trying to persist its own path in final 
archived path, which means archive files will not be placed in same directory.

Based on above, I thought enforcing archive path with checking glob path is 
not easy to do, because without knowing final archive path (per file) we can't 
check it matches with glob pattern. That's why I just would rather restrict all 
subdirectories instead of finding a way to check against glob pattern.

Actually I'm a bit afraid that we might be putting too much complexity on 
enforcing archive path. If we are OK with not enforcing archive path and just 
verify the final archive path doesn't overlap source path per each source file, 
it would be simple to do. We can make Spark not moving file and log warning 
message to let end users specify other directory.

Would like to hear everyone's thought and idea. Thanks in advance!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-02 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r238090190
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the 
application can be configured
+  via Spark parameters and may not need JAAS login configuration (Spark 
can use Kafka's dynamic JAAS
+  configuration feature). For further information about delegation tokens, 
see
+  [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+  The process is initiated by Spark's Kafka delegation token provider. 
This is enabled by default
+  but can be turned off with `spark.security.credentials.kafka.enabled`. 
When
+  `spark.kafka.bootstrap.servers` set Spark looks for authentication 
information in the following
+  order and choose the first available to log in:
+  - **JAAS login configuration**
+  - **Keytab file**, such as,
+
+./bin/spark-submit \
+--keytab  \
+--principal  \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  - **Kerberos credential cache**, such as,
+
+./bin/spark-submit \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  Spark supports the following authentication protocols to obtain token:
+  - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for 
authentication and SSL for encryption.
+  - **SSL**: It's leveraging a capability from SSL called 2-way 
authentication. The server authenticate
+clients through certificates. Please note 2-way authentication must be 
enabled on Kafka brokers.
+  - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos 
used for authentication but
+because there is no encryption it's only for testing purposes.
+
+  After delegation token successfully obtained Spark spreads it across 
nodes and renews it accordingly.
+  Delegation token uses `SCRAM` login module for authentication.
+
+  When delegation token is available for example on an executor it can be 
overridden with JAAS login
--- End diff --

nit: Removing `for example` sounds clearer.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-02 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r238089426
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the 
application can be configured
+  via Spark parameters and may not need JAAS login configuration (Spark 
can use Kafka's dynamic JAAS
+  configuration feature). For further information about delegation tokens, 
see
+  [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+  The process is initiated by Spark's Kafka delegation token provider. 
This is enabled by default
+  but can be turned off with `spark.security.credentials.kafka.enabled`. 
When
+  `spark.kafka.bootstrap.servers` set Spark looks for authentication 
information in the following
+  order and choose the first available to log in:
+  - **JAAS login configuration**
+  - **Keytab file**, such as,
+
+./bin/spark-submit \
+--keytab  \
+--principal  \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  - **Kerberos credential cache**, such as,
+
+./bin/spark-submit \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  Spark supports the following authentication protocols to obtain token:
+  - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for 
authentication and SSL for encryption.
+  - **SSL**: It's leveraging a capability from SSL called 2-way 
authentication. The server authenticate
--- End diff --

nit: authenticate -> authenticate`s`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-02 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r238090666
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the 
application can be configured
+  via Spark parameters and may not need JAAS login configuration (Spark 
can use Kafka's dynamic JAAS
+  configuration feature). For further information about delegation tokens, 
see
+  [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+  The process is initiated by Spark's Kafka delegation token provider. 
This is enabled by default
+  but can be turned off with `spark.security.credentials.kafka.enabled`. 
When
+  `spark.kafka.bootstrap.servers` set Spark looks for authentication 
information in the following
+  order and choose the first available to log in:
+  - **JAAS login configuration**
+  - **Keytab file**, such as,
+
+./bin/spark-submit \
+--keytab  \
+--principal  \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  - **Kerberos credential cache**, such as,
+
+./bin/spark-submit \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  Spark supports the following authentication protocols to obtain token:
+  - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for 
authentication and SSL for encryption.
+  - **SSL**: It's leveraging a capability from SSL called 2-way 
authentication. The server authenticate
+clients through certificates. Please note 2-way authentication must be 
enabled on Kafka brokers.
+  - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos 
used for authentication but
+because there is no encryption it's only for testing purposes.
+
+  After delegation token successfully obtained Spark spreads it across 
nodes and renews it accordingly.
--- End diff --

nit: `After obtaining delegation token successfully,` sounds more natural 
for me, but just adding `,` before `Spark` would be also fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23195: [SPARK-26236][SS] Add kafka delegation token supp...

2018-12-02 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/23195#discussion_r238090476
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -624,3 +624,57 @@ For experimenting on `spark-shell`, you can also use 
`--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more 
details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a 
cluster. For detailed
+description about these possibilities, see [Kafka security 
docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**: This way the 
application can be configured
+  via Spark parameters and may not need JAAS login configuration (Spark 
can use Kafka's dynamic JAAS
+  configuration feature). For further information about delegation tokens, 
see
+  [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+  The process is initiated by Spark's Kafka delegation token provider. 
This is enabled by default
+  but can be turned off with `spark.security.credentials.kafka.enabled`. 
When
+  `spark.kafka.bootstrap.servers` set Spark looks for authentication 
information in the following
+  order and choose the first available to log in:
+  - **JAAS login configuration**
+  - **Keytab file**, such as,
+
+./bin/spark-submit \
+--keytab  \
+--principal  \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  - **Kerberos credential cache**, such as,
+
+./bin/spark-submit \
+--conf spark.kafka.bootstrap.servers= \
+...
+
+  Spark supports the following authentication protocols to obtain token:
+  - **SASL SSL (default)**: With `GSSAPI` mechanism Kerberos used for 
authentication and SSL for encryption.
+  - **SSL**: It's leveraging a capability from SSL called 2-way 
authentication. The server authenticate
+clients through certificates. Please note 2-way authentication must be 
enabled on Kafka brokers.
+  - **SASL PLAINTEXT (for testing)**: With `GSSAPI` mechanism Kerberos 
used for authentication but
+because there is no encryption it's only for testing purposes.
+
+  After delegation token successfully obtained Spark spreads it across 
nodes and renews it accordingly.
+  Delegation token uses `SCRAM` login module for authentication.
+
+  When delegation token is available for example on an executor it can be 
overridden with JAAS login
+  configuration.
+- **JAAS login configuration**: JAAS login configuration must be created 
and transferred to all
--- End diff --

nit: `transferred` sounds like end users should send the files. `placed` 
sounds more general to allow how to let JAAS conf file be available at any way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-01 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
@zsxwing 
Yeah, it would be ideal we can enforce `archivePath` to which don't have 
any possibility to match against source path (glob), so my approach was to find 
directory which is the base directory without having glob in ancestor, and 
`archive path + base directory of source path` doesn't belong to sub-directory 
of found directory.

For example, suppose source path is `/a/b/c/*/ef?/*/g/h/*/i`, then base 
directory of source path would be `/a/b/c`, and `archive path + base directory 
of source path` should not belong to sub-directory of `/a/b/c`.
(My code has a bug for finding the directory so need to fix it.)

This is not an elegant approach and the approach has false-positive, ending 
up restricting the archive path which actually doesn't make overlap (too 
restrict), but it would guarantee two paths never overlap. (So no need to 
re-check when renaming file.)

I guess the approach might be reasonable because in practice end users 
would avoid themselves have to think about complicated case on overlaps, and 
just isolate two paths.

What do you think about this approach?

cc. @gaborgsomogyi Could you also help validating my approach? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-11-29 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
@zsxwing Btw, how do you think about addressing background move/deletion (I 
had thought and @gaborgsomogyi also suggested as well) into separate issue? I 
guess putting more feature would let you spend more time to review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237481604
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  try {
+logDebug(s"Creating directory if it doesn't exist 
${newPath.getParent}")
+if (!fs.exists(newPath.getParent)) {
+  fs.mkdirs(newPath.getParent)
+}
+
+logDebug(s"Archiving completed file $curPath to $newPath")
+fs.rename(curPath, newPath)
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being 
stopped
+  logWarning(s"Fail to move $curPath to $newPath / skip moving 
file.", e)
+  }
+}
+
+def remove(entry: FileEntry): Unit = {
+  val curPath = new Path(entry.path)
--- End diff --

I just modified existing UT to have space and % in directory name as well 
as file name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-11-29 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
@zsxwing Thanks for the detailed review! Addressed review comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237342362
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
--- End diff --

Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237342346
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -100,6 +101,36 @@ class FileStreamSource(
 
   logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = 
$maxFileAgeMs")
 
+  ensureNoOverlapBetweenSourceAndArchivePath()
+
+  private def ensureNoOverlapBetweenSourceAndArchivePath(): Unit = {
+@tailrec
+def removeGlob(path: Path): Path = {
+  if (path.getName.contains("*")) {
+removeGlob(path.getParent)
+  } else {
+path
+  }
+}
+
+sourceOptions.sourceArchiveDir match {
+  case None =>
+  case Some(archiveDir) =>
+val sourceUri = removeGlob(qualifiedBasePath).toUri
+val archiveUri = new Path(archiveDir).toUri
+
+val sourcePath = sourceUri.getPath
+val archivePath = archiveUri.getPath
--- End diff --

Nice finding. Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237342072
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  try {
+logDebug(s"Creating directory if it doesn't exist 
${newPath.getParent}")
+if (!fs.exists(newPath.getParent)) {
+  fs.mkdirs(newPath.getParent)
+}
+
+logDebug(s"Archiving completed file $curPath to $newPath")
+fs.rename(curPath, newPath)
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being 
stopped
+  logWarning(s"Fail to move $curPath to $newPath / skip moving 
file.", e)
+  }
+}
+
+def remove(entry: FileEntry): Unit = {
+  val curPath = new Path(entry.path)
--- End diff --

Yeah... actually I was somewhat confused I have to escape/unescape for 
path. Thanks for suggestion. Will address and add a new unit test for testing 
it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237341854
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  try {
+logDebug(s"Creating directory if it doesn't exist 
${newPath.getParent}")
+if (!fs.exists(newPath.getParent)) {
+  fs.mkdirs(newPath.getParent)
+}
+
+logDebug(s"Archiving completed file $curPath to $newPath")
+fs.rename(curPath, newPath)
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being 
stopped
+  logWarning(s"Fail to move $curPath to $newPath / skip moving 
file.", e)
+  }
+}
+
+def remove(entry: FileEntry): Unit = {
+  val curPath = new Path(entry.path)
+  try {
+logDebug(s"Removing completed file $curPath")
+fs.delete(curPath, false)
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being 
stopped
+  logWarning(s"Fail to remove $curPath / skip removing file.", e)
+  }
+}
+
+val logOffset = FileStreamSourceOffset(end).logOffset
+metadataLog.get(logOffset) match {
--- End diff --

Ah I didn't indicate that. Thanks for letting me know! Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237341425
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  try {
+logDebug(s"Creating directory if it doesn't exist 
${newPath.getParent}")
+if (!fs.exists(newPath.getParent)) {
+  fs.mkdirs(newPath.getParent)
+}
+
+logDebug(s"Archiving completed file $curPath to $newPath")
+fs.rename(curPath, newPath)
--- End diff --

Yeah, I guess the patch prevents the case if it works like my expectation, 
but I'm also in favor of defensive programming and logging would be better for 
end users. Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237340952
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
 ---
@@ -74,6 +76,39 @@ class FileStreamOptions(parameters: 
CaseInsensitiveMap[String]) extends Logging
*/
   val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
 
+  /**
+   * The archive directory to move completed files. The option will be 
only effective when
+   * "cleanSource" is set to "archive".
+   *
+   * Note that the completed file will be moved to this archive directory 
with respecting to
+   * its own path.
+   *
+   * For example, if the path of source file is "/a/b/dataset.txt", and 
the path of archive
+   * directory is "/archived/here", file will be moved to 
"/archived/here/a/b/dataset.txt".
+   */
+  val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir")
+
+  /**
+   * Defines how to clean up completed files. Available options are 
"archive", "delete", "no_op".
+   */
+  val cleanSource: CleanSourceMode.Value = {
+val modeStrOption = parameters.getOrElse("cleanSource", 
CleanSourceMode.NO_OP.toString)
--- End diff --

OK will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237340938
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -100,6 +101,36 @@ class FileStreamSource(
 
   logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = 
$maxFileAgeMs")
 
+  ensureNoOverlapBetweenSourceAndArchivePath()
--- End diff --

Ah yes missed it. Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237340601
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -530,6 +530,12 @@ Here are the details of all the sources in Spark.
 "s3://a/dataset.txt"
 "s3n://a/b/dataset.txt"
 "s3a://a/b/c/dataset.txt"
+cleanSource: option to clean up completed files after 
processing.
+Available options are "archive", "delete", "no_op". If the option 
is not provided, the default value is "no_op".
+When "archive" is provided, additional option 
sourceArchiveDir must be provided as well. The value of 
"sourceArchiveDir" must be outside of source path, to ensure archived files are 
never included to new source files again.
+Spark will move source files respecting its own path. For example, 
if the path of source file is "/a/b/dataset.txt" and the path of archive 
directory is "/archived/here", file will be moved to 
"/archived/here/a/b/dataset.txt"
+NOTE: Both archiving (via moving) or deleting completed files 
would introduce overhead (slow down) in each micro-batch, so you need to 
understand the cost for each operation in your file system before enabling this 
option. On the other hand, enbling this option would reduce the cost to list 
source files which is considered as a heavy operation.
+NOTE 2: The source path should not be used from multiple queries 
when enabling this option, because source files will be moved or deleted which 
behavior may impact the other queries.
--- End diff --

Nice finding. I missed the case which multiple sources in same query refer 
same file directory. Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23169: [SPARK-26103][SQL] Limit the length of debug stri...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/23169#discussion_r237305318
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1610,6 +1610,12 @@ object SQLConf {
   """ "... N more fields" placeholder.""")
 .intConf
 .createWithDefault(25)
+
+  val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.debug.maxPlanLength")
--- End diff --

I'm not sure `debug` is right. You know this patch should help UI to reduce 
memory usage which is not a debug. If we specify `debug` here, end users would 
interpret as there's a debug mode. Same as description.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23169: [SPARK-26103][SQL] Limit the length of debug stri...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/23169#discussion_r237304214
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.io.Writer
+
+class WriterSizeException(val attemptedSize: Long, val charLimit: Long) 
extends Exception(
+  s"Attempted to write $attemptedSize characters to a writer that is 
limited to $charLimit")
+
+/**
+ * This class is used to control the size of generated writers.  
Guarantees that the total number
+ * of characters written will be less than the specified size.
+ *
+ * Checks size before writing and throws a WriterSizeException if the 
total size would count the
+ * limit.
+ */
+class SizeLimitedWriter(underlying: Writer, charLimit: Long) extends 
Writer {
+
+  var charsWritten: Long = 0
+
+  override def write(cbuf: Array[Char], off: Int, len: Int): Unit = {
--- End diff --

I'd rather make this writing the content as much as possible (with `...`), 
or let WriterSizeException contains relevant information to help caller be able 
to call this again with smaller length. 

In worst case, if first physical plan is huge we end up only showing `...`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23169: [SPARK-26103][SQL] Limit the length of debug stri...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/23169#discussion_r237301700
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.io.Writer
+
+class WriterSizeException(val attemptedSize: Long, val charLimit: Long) 
extends Exception(
+  s"Attempted to write $attemptedSize characters to a writer that is 
limited to $charLimit")
+
+/**
+ * This class is used to control the size of generated writers.  
Guarantees that the total number
+ * of characters written will be less than the specified size.
+ *
+ * Checks size before writing and throws a WriterSizeException if the 
total size would count the
+ * limit.
+ */
+class SizeLimitedWriter(underlying: Writer, charLimit: Long) extends 
Writer {
+
+  var charsWritten: Long = 0
--- End diff --

Looks like it should not be exposed outside of class. Let's guard it as 
`private`. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23169: [SPARK-26103][SQL] Limit the length of debug stri...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/23169#discussion_r237309191
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala ---
@@ -202,6 +202,26 @@ package object util extends Logging {
   /** Shorthand for calling truncatedString() without start or end 
strings. */
   def truncatedString[T](seq: Seq[T], sep: String): String = 
truncatedString(seq, "", sep, "")
 
+  /** Whether we have warned about plan string truncation yet. */
+  private val planSizeWarningPrinted = new AtomicBoolean(false)
+
+  def withSizeLimitedWriter[T](writer: Writer)(f: (Writer) => T): 
Option[T] = {
+try {
+  val limited = new SizeLimitedWriter(writer, 
SQLConf.get.maxPlanStringLength)
+  Some(f(limited))
+}
+catch {
+  case e: WriterSizeException =>
+writer.write("...")
--- End diff --

nit: Would we want to restrict string's length to maxPlanStringLength 
including `...`? I think exceeding defined length by max 3 chars is not a big 
deal but just to double check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23169: [SPARK-26103][SQL] Limit the length of debug stri...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/23169#discussion_r237307829
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
 ---
@@ -595,4 +596,14 @@ class TreeNodeSuite extends SparkFunSuite {
 val expected = Coalesce(Stream(Literal(1), Literal(3)))
 assert(result === expected)
   }
+
+  test("toString() tree depth") {
--- End diff --

`treeString` sounds right to me rather than `toString`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23142: [SPARK-26170][SS] Add missing metrics in FlatMapGroupsWi...

2018-11-27 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/23142
  
cc. @tdas @zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23142: [SPARK-26170][SS] Add missing metrics in FlatMapGroupsWi...

2018-11-26 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/23142
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23142: [SPARK-26170][SS] Add missing metrics in FlatMapG...

2018-11-25 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/23142

[SPARK-26170][SS] Add missing metrics in FlatMapGroupsWithState

## What changes were proposed in this pull request?

This patch addresses measuring possible metrics in StateStoreWriter to 
FlatMapGroupsWithStateExec. Please note that some metrics like time to remove 
elements are not addressed because they are coupled with state function.

## How was this patch tested?

Manually tested with 
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala.

Snapshots below:

![screen shot 2018-11-26 at 4 13 40 
pm](https://user-images.githubusercontent.com/1317309/48999346-b5f7b400-f199-11e8-89c7-8795f13470d6.png)
![screen shot 2018-11-26 at 4 13 54 
pm](https://user-images.githubusercontent.com/1317309/48999347-b5f7b400-f199-11e8-91ef-ef0b2f816b2e.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-26170

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23142.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23142


commit 56f39cc5838c3f609c8657639ac3a45991fde99f
Author: Jungtaek Lim (HeartSaVioR) 
Date:   2018-11-26T07:33:08Z

SPARK-26170 Add missing metrics in FlatMapGroupsWithState




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23103: [SPARK-26121] [Structured Streaming] Allow users to defi...

2018-11-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/23103
  
LGTM. Btw, IMHO, TODOs @zouzias described would be better to be addressed 
at once since documentation is easy to be forgotten.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-11-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
@gaborgsomogyi 
Thanks for taking care, but I guess I can manage it. I'll ask for help when 
I can't go back to this one.
This patch (latest change) hasn't get any feedback on committers yet so 
let's not rush on this and wait for it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-11-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
@gaborgsomogyi 
Thanks for reviewing! I addressed your review comments except asynchronous 
cleanup, which might be able to break down to separated issue.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-11-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
@gaborgsomogyi 
Yeah I also thought about the idea (commented above) but I've lost focus on 
other task. Given that smaller patch is better to be reviewed easily and 
current patch works well (except overheads on cleaning in same thread), would 
we split this up and address it to another issue?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r235632761
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -530,6 +530,12 @@ Here are the details of all the sources in Spark.
 "s3://a/dataset.txt"
 "s3n://a/b/dataset.txt"
 "s3a://a/b/c/dataset.txt"
+cleanSource: option to clean up completed files after 
processing.
+Available options are "archive", "delete", "no_op". If the option 
is not provided, the default value is "no_op".
+When "archive" is provided, additional option 
sourceArchiveDir must be provided as well. The value of 
"sourceArchiveDir" must be outside of source path, to ensure archived files are 
never included to new source files again.
--- End diff --

Yeah I guess you're right. I'll add a logic to check in initialization on 
FileStreamSource.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r235632872
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
 ---
@@ -74,6 +76,43 @@ class FileStreamOptions(parameters: 
CaseInsensitiveMap[String]) extends Logging
*/
   val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
 
+  /**
+   * The archive directory to move completed files. The option will be 
only effective when
+   * "cleanSource" is set to "archive".
+   *
+   * Note that the completed file will be moved to this archive directory 
with respecting to
+   * its own path.
+   *
+   * For example, if the path of source file is "/a/b/dataset.txt", and 
the path of archive
+   * directory is "/archived/here", file will be moved to 
"/archived/here/a/b/dataset.txt".
+   */
+  val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir")
+
+  /**
+   * Defines how to clean up completed files. Available options are 
"archive", "delete", "no_op".
+   */
+  val cleanSource: CleanSourceMode.Value = {
+val modeStrOption = parameters.getOrElse("cleanSource", 
CleanSourceMode.NO_OP.toString)
+  .toUpperCase(Locale.ROOT)
+
+val matchedModeOpt = CleanSourceMode.values.find(_.toString == 
modeStrOption)
+if (matchedModeOpt.isEmpty) {
--- End diff --

Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r235632809
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +258,64 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  if (!fs.exists(newPath.getParent)) {
--- End diff --

Nice finding. Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23076: [SPARK-26103][SQL] Added maxDepth to limit the length of...

2018-11-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/23076
  
I'm seeing both sides of needs: while I think dumping full plan into file 
is a good feature for debugging specific issue, retaining full plans for 
representing them to UI page have been a headache and three regarding issues 
([SPARK-23904](https://issues.apache.org/jira/browse/SPARK-23904), 
[SPARK-25380](https://issues.apache.org/jira/browse/SPARK-25380), 
[SPARK-26103](https://issues.apache.org/jira/browse/SPARK-26103)) are filed in 
3 months which doesn't look like a thing we can say end users should take a 
workaround.

One thing we may be aware is that huge plan is not generated not only from 
nested join, but also from lots of columns, like SPARK-23904. For SPARK-25380 
we are not aware of which parts generate huge plan. So we might feel easier and 
flexible to just truncate to specific size rather than applying conditions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

2018-11-16 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
@zsxwing @dongjoon-hyun @steveloughran 
Thanks all for the valuable feedback! I applied review comments.

While I covered the new feature with new UTs, I'm yet to test this manually 
with HDFS. I'll find the time to do manual test in next week. For cloud 
storages, TBH, it's not easy for me to do manual test against them, so I'd wish 
to lean on reviewers' eyes and experiences.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...

2018-11-16 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22138
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

2018-11-12 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r232869187
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -530,6 +530,8 @@ Here are the details of all the sources in Spark.
 "s3://a/dataset.txt"
 "s3n://a/b/dataset.txt"
--- End diff --

This looks like beyond of this PR: we can address it in separate PR. Could 
you raise another one?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

2018-11-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r231717554
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -530,6 +530,8 @@ Here are the details of all the sources in Spark.
 "s3://a/dataset.txt"
 "s3n://a/b/dataset.txt"
 "s3a://a/b/c/dataset.txt"
+
+renameCompletedFiles: whether to rename completed 
files in previous batch (default: false). If the option is enabled, input file 
will be renamed with additional postfix "_COMPLETED_". This is useful to clean 
up old input files to save space in storage.
--- End diff --

Totally agreed, and that matches the option 3 I've proposed. And option 1 
would not affect much on critical path in a batch since rename operations will 
be enqueued and background thread will take care.

For option 1, guaranteeing makes the thing being complicated. If we are OK 
to NOT guarantee that all processed files are renamed, we can take the renaming 
in background (like option 1) without handling backpressure, and simply drop 
the requests in queue with logging if the size is beyond the threshold or JVM 
is shutting down.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

2018-11-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r231695749
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -530,6 +530,8 @@ Here are the details of all the sources in Spark.
 "s3://a/dataset.txt"
 "s3n://a/b/dataset.txt"
 "s3a://a/b/c/dataset.txt"
+
+renameCompletedFiles: whether to rename completed 
files in previous batch (default: false). If the option is enabled, input file 
will be renamed with additional postfix "_COMPLETED_". This is useful to clean 
up old input files to save space in storage.
--- End diff --

@dongjoon-hyun 
For Storm, it renames input file twice, 1. in process 2. completed 
(actually it is not a rename, but move to archive directory). HDFS spout is 
created at 2015 which I don't expect there's deep consideration on cloud 
storage.
For Flink I have no idea, I'll explore how they handle it.

I think the feature is just an essential thing in ETL situation: a comment 
in JIRA clearly shows why the feature is needed.

https://issues.apache.org/jira/browse/SPARK-20568?focusedCommentId=16356929=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16356929


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

2018-11-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r231429484
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -530,6 +530,8 @@ Here are the details of all the sources in Spark.
 "s3://a/dataset.txt"
 "s3n://a/b/dataset.txt"
 "s3a://a/b/c/dataset.txt"
+
+renameCompletedFiles: whether to rename completed 
files in previous batch (default: false). If the option is enabled, input file 
will be renamed with additional postfix "_COMPLETED_". This is useful to clean 
up old input files to save space in storage.
--- End diff --

Hi @dongjoon-hyun , thanks for pointing out good point! I was being 
concerned about only filesystem/HDFS case and not familiar with cloud 
environment.

I guess we have possible options here:

1. Rename in background thread. 

For option 1, we may want to restrict the max files to enqueue, and when it 
reaches the max we may handle some of them synchronously. And we also may need 
to postpone JVM shutdown until all enqueued files are renamed.

2. Provide additional option: delete (options are mutually exclusive)

Actually the actions end users are expected to take are 1. moving to 
archive directory (with compression or not) 2. delete periodically. If 
moving/renaming require non-trivial cost, end users may want to just delete 
files directly without backing up.

3. Document the overhead to description of option.

While we can not clearly say how much the cost is, we can explain the fact 
the cleanup operation may affect processing of batch.

Provided options are not mutually exclusive.

cc. to @steveloughran - I think you're expert on cloud storage: could you 
provide your thought on this?
also cc. to @zsxwing in case of missing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

2018-11-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
cc. @zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...

2018-11-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22138
  
@zsxwing 
Given that Spark 2.4 vote passes, could we revisit and make progress on 
this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

2018-11-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22952
  
I feel the patch is simple to skip verifying manually against HDFS, but 
I'll try to spin up HDFS cluster and test this manually.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

2018-11-05 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/22952

[SPARK-20568][SS] Rename files which are completed in previous batch

## What changes were proposed in this pull request?

This patch adds the option to rename files which are completed in previous 
batch, so that end users can clean up processed files to save their storage.

It is only applied to "micro-batch", since for batch all input files must 
be kept to get same result across multiple query executions.

## How was this patch tested?

Added UT, manually tested with Mac local. (The logic is very simple so not 
sure we need to verify with HDFS manually.)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-20568

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22952.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22952


commit 8a1d2e187c667833b2de8eb4cba2fa04dca9c6ff
Author: Jungtaek Lim 
Date:   2018-11-05T04:32:51Z

SPARK-20568 Rename files which are completed in previous batch




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively

2018-11-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22482
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively

2018-11-01 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22482
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively

2018-10-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22482
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively

2018-10-21 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22482
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively

2018-10-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22482
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r225752604
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[kafka010] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[kafka010] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+if (log.isDebugEnabled) {
+  val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+  logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+  val tokenInfo = token.tokenInfo
+  logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+tokenInfo.tokenId,
+tokenInfo.owner,
+tokenInfo.renewersAsString,
+dateFormat.format(tokenInfo.issueTimestamp),
+dateFormat.format(tokenInfo.expiryTimestamp),
+dateFormat.format(tokenInfo.maxTimestamp)))
+}
+  }
+
+  private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
+// - Keytab is provided -> try to log in with kerberos module using 
kafka's dynamic JAAS
+//   configuration.
+// - Keytab not provided -> try to log in with JVM global security 
configuration
+//   which can be configured for example with 
'java.security.auth.login.config'.
+//   For this no additional parameter needed.
+ 

[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively

2018-10-12 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22482
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r224320793
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_DELEGATION_TOKEN_ENABLED, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
--- End diff --

Having an utility trait or utility singleton object could reduce the 
overkill, but personally I'd be OK on allowing 5~10 lines of duplication. If we 
are likely to leverage Scala reflection other than catalyst continuously 
(HBaseDelegationTokenProvider does it for two times), we could have utility 
class for that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r224323537
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -647,4 +647,42 @@ package object config {
   .stringConf
   .toSequence
   .createWithDefault(Nil)
+
+  private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED =
+ConfigBuilder("spark.kafka.delegation.token.enabled")
+  .doc("Set to 'true' for obtaining delegation token from kafka.")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val KAFKA_BOOTSTRAP_SERVERS =
+ConfigBuilder("spark.kafka.bootstrap.servers")
--- End diff --

And I would rather say it should be a flag to enable/disable on delegation 
token. Not all end users who use secured Kafka cluster want to leverage 
delegation token.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r224334764
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[kafka010] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[kafka010] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+if (log.isDebugEnabled) {
+  val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+  logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+  val tokenInfo = token.tokenInfo
+  logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+tokenInfo.tokenId,
+tokenInfo.owner,
+tokenInfo.renewersAsString,
+dateFormat.format(tokenInfo.issueTimestamp),
+dateFormat.format(tokenInfo.expiryTimestamp),
+dateFormat.format(tokenInfo.maxTimestamp)))
+}
+  }
+
+  private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
+// - Keytab is provided -> try to log in with kerberos module using 
kafka's dynamic JAAS
+//   configuration.
+// - Keytab not provided -> try to log in with JVM global security 
configuration
+//   which can be configured for example with 
'java.security.auth.login.config'.
+//   For this no additional parameter needed.
+ 

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r224322849
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -647,4 +647,42 @@ package object config {
   .stringConf
   .toSequence
   .createWithDefault(Nil)
+
+  private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED =
+ConfigBuilder("spark.kafka.delegation.token.enabled")
+  .doc("Set to 'true' for obtaining delegation token from kafka.")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val KAFKA_BOOTSTRAP_SERVERS =
+ConfigBuilder("spark.kafka.bootstrap.servers")
--- End diff --

While it is not possible to provide relevant configuration to source/sink, 
pre-defining Kafka related configurations one-by-one in here feels me as being 
too coupled with Kafka.

It might also give confusion on where to put configuration on Kafka 
source/sink: this configuration must be only used for delegation token, but I 
can't indicate it from both configuration name as well as its doc.

My 2 cents is just reserving prefix `spark.kafka.token` or similar, and 
leave a comment and don't define anything here. Would like to hear how 
committers think about how to add external configurations on Spark conf.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r224338353
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[kafka010] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[kafka010] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+if (log.isDebugEnabled) {
+  val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+  logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+  val tokenInfo = token.tokenInfo
+  logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+tokenInfo.tokenId,
+tokenInfo.owner,
+tokenInfo.renewersAsString,
+dateFormat.format(tokenInfo.issueTimestamp),
+dateFormat.format(tokenInfo.expiryTimestamp),
+dateFormat.format(tokenInfo.maxTimestamp)))
+}
+  }
+
+  private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
+// - Keytab is provided -> try to log in with kerberos module using 
kafka's dynamic JAAS
+//   configuration.
+// - Keytab not provided -> try to log in with JVM global security 
configuration
+//   which can be configured for example with 
'java.security.auth.login.config'.
+//   For this no additional parameter needed.
+ 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222840402
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the c

[GitHub] spark pull request #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStr...

2018-10-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22633#discussion_r222832931
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
 ---
@@ -0,0 +1,64 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package test.org.apache.spark.sql.streaming;
+
+import java.io.File;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.function.VoidFunction2;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.test.TestSparkSession;
+import org.apache.spark.util.Utils;
+
+public class JavaDataStreamReaderWriterSuite {
+  private SparkSession spark;
+  private String input;
+
+  @Before
+  public void setUp() {
+spark = new TestSparkSession();
+input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"input").toString();
+  }
+
+  @After
+  public void tearDown() {
+Utils.deleteRecursively(new File(input));
+spark.stop();
+spark = null;
+  }
+
+  @Test
+  public void testForeachBatchAPI() {
--- End diff --

MINOR: I guess it will be duplicated effort on both Scala and Java suite, 
but IMHO adding sanity check wouldn't hurt much and prevent further possible 
misses.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...

2018-10-01 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22138
  
@gaborgsomogyi Yeah... I'm just waiting for it. Btw I proposed solution on 
SPARK-10816 as well and it is also waiting for response. I'm going to work on 
another item or review others so that I can avoid being blocked by Spark 2.4 RC.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...

2018-09-30 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22138
  
Kindly ask for reviewing. Please never mind when you're busy with fixing 
bugs on Spark 2.4 RC.

@gaborgsomogyi  I guess I left two things for committer decision: 1. define 
soft boundary and log when pooled objects exceed the boundary 2. documentation. 
Do you have more to review?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22579: [SPARK-25429][SQL] Use Set instead of Array to im...

2018-09-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22579#discussion_r221177626
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -83,7 +83,7 @@ class SQLAppStatusListener(
 // track of the metrics knows which accumulators to look at.
 val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
--- End diff --

If we are going to make Set, I guess we don't need to sort it, and may also 
don't need to convert to List and Set again.

Does changing List to Set affect any content on UI page? Just wanted to 
double check why we have been sorting the accumulator ids, and whether this 
patch breaks the intention or not.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...

2018-09-27 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22138
  
Just rebased.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...

2018-09-21 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22138
  
@gaborgsomogyi Totally makes sense. Let me address while the patch is 
reviewed by committers. I may get recommendations to rename the config or even 
more, so addressing documentation would be the last part.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22331: [SPARK-25331][SS] Make FileStreamSink ignore part...

2018-09-21 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22331#discussion_r219399313
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StagingFileCommitProtocol.scala
 ---
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.hadoop.fs.{FileAlreadyExistsException, FileContext, Path}
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+
+class StagingFileCommitProtocol(jobId: String, path: String)
+  extends FileCommitProtocol with Serializable with Logging
+  with ManifestCommitProtocol {
+  private var stagingDir: Option[Path] = None
--- End diff --

Looks like you're using Option but always call `.get` without any checking. 
In `setupTask` it is fine since assignment is placed in there, but in 
`newTaskTempFile` we may be better to guard with `require` which achieves 
fail-fast and let `.get` always succeed later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively

2018-09-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22482
  
According to the discussion on SPARK-10816, I'm holding up effort to 
improve and plan to discuss further from JIRA issue. I guess someone interested 
for this patch can still review or try this out and share feedback.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r219367280
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -18,222 +18,247 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io.Closeable
 import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
+ * so all the methods should not rely on current cursor and use seek 
manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Closeable with 
Logging {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  private val consumer = createConsumer
 
-private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available.
-   *
-   * If the record is invisible (either a
-   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
-   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
-   * within [offset, untilOffset).
-   *
-   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, 
untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible 
messages or data loss and
-   * reaches `untilOffset`, it will return `null`.
+   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
+   * and "offset after poll". The list of consumer record may be empty if 
the Kafka consumer fetches
+   * some messages but all of them are not visible messages (either 
transaction messages,
+   * or aborted messages when `isolation.level` is `read_committed`).
*
-   * @param offset the offset to fetch.
-   * @param untilOffsetthe max offset to fetch. Exclusive.
-   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
-   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
-   *   this method will either return record at offset 
if available, or return
-   *   the next earliest available record less than 
untilOffset, or null. It
-   *   will not throw any exception.
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
*/
-  def get(
-  offset: Long,
-  untilOffset: Long,
-  pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+  def fetch(offset: Long, pollTimeoutMs: Long)
+  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+// Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
+seek(offset)
+val p = consumer.poll(pollTimeoutMs)
+val r = p.records(topicPartition)
+logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
+val offsetAfterPoll = consumer.position(topicPartition)
+logDebug(s"Offset changed from $offset to $offsetA

[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively

2018-09-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22482
  
If we are fine with ignoring the optimal delta of state, or OK with 
addressing it in follow-up issue (it should be addressed in same release 
version to avoid having state V1, V2, etc...), I think the only TODO is writing 
javadoc as well as deduplicate some codes. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively

2018-09-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22482
  
@arunmahadevan 
We may want to be aware is that the requirement is pretty different from 
other streaming frameworks like Flink, which normally set a long period of 
checkpoint interval and do a full snapshot (though it supports incremental 
checkpoint, which deals with how it minimizes amount of storing data). 

Here in Spark, we are expecting smaller batch interval, and Spark deals 
with the requirement as storing "delta"  of state change. The behavior brings 
concern about the strategy of how we store and how we remove the state. 

Let's say we have 3 rows in group in batch result and there're also 3 rows 
in same group in state, and we want to replace state with new batch result. For 
full snapshot removing 3 rows first and putting 3 rows may not matter much, but 
with delta approach, we should compare them side-by-side and bring less changes 
on state.

The difference is not trivial one for session window, because arbitrary 
changes are required: for example, two different sessions in state can be 
merged later when late events come in, then we should have to overwrite one and 
remove others. Some new sessions can be created as well as existing session, 
and we want to overwrite session if the new output session is originated from 
old state, and append session if not. For other window, it is just a "put" 
because there's no group and we are just safe to put (overwrite if any, and 
without evict there's no need to remove). The different requirements between 
time window and session window are not easy to combine into one.

That's what I realized the difficulty of state part for session window, and 
that's why I feel I need to make change on streaming part. For batch part 
current patch is doing OK.

Btw, we can assume `AssignWindows` as `TimeWindowing` and 
`SessionWindowing` as we are logically assign rows to individual window. So 
unless we would like to support custom window like dynamic gap session window, 
I think we can address it later whenever needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively

2018-09-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22482
  
Please review the general approach and direction first. I'm planning to 
spend time to rewrite streaming part to tightly integrate logic with state so 
that updating state is going to be minimized.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively

2018-09-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22482
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...

2018-09-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22138
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively

2018-09-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22482
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively

2018-09-19 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22482
  
The patch is a bit huge, so I'm not sure we would be better to squash 
commits into one before reviewing.

Two TODOs are left hence marking the patch as WIP, but closer to be a 
complete patch:

1. Optimal implementation of state for session window.

It borrowed the state implementation from streaming join since it fits the 
necessary concept of state for session window, but it may not be optimal one so 
I'm going to see we can have better implementation.

2. Javadoc (Maybe structured streaming guide doc too?)

I didn't add javadoc yet to speed up POC and actual development, but to 
complete the patch I guess I need to write javadoc for new classes as well as 
methods (maybe).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22482: WIP - [SPARK-10816][SS] Support session window na...

2018-09-19 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/22482

WIP - [SPARK-10816][SS] Support session window natively

## What changes were proposed in this pull request?

This patch proposes native support of session window, like Spark has been 
supporting for time window.

Please refer the attached doc in 
[SPARK-10816](https://issues.apache.org/jira/browse/SPARK-10816) for more 
details on rationalization, concepts, and limitation, etc.

In point of end users' view, only the change is addition of "session" SQL 
function. End users could define query with session window as replacing 
"window" function to "session" function, and "window" column to "session" 
column. After then the patch will provide same experience with time window.

Internally, this patch will change the physical plan of aggregation a bit: 
if there's session function being used in query, it will sort the input rows as 
"grouping keys" + "session", and merge overlapped sessions into one with 
applying aggregations, so it's like a sort based aggregation but the unit of 
group is grouping keys + session.

Due to handle late event, there's a case multiple session windows co-exist 
per key which are not yet to evict. This patch handles the case via borrowing 
state implementation from streaming join which can handle multiple values for 
given key.

## How was this patch tested?

Many UTs are added to verify session window queries for both batch and 
streaming.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-10816

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22482.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22482


commit a1af74611df7dd5b979fc1a288de96e0b3d415da
Author: Jungtaek Lim 
Date:   2018-09-04T23:10:47Z

WIP nothing worked, just recording the progress

commit be502485047283e203933a4d78e3b580a0c567df
Author: Jungtaek Lim 
Date:   2018-09-06T04:36:11Z

WIP not working yet... lots of implementations needed

commit 7c60c0ad922ddacf025ad4762b85d06ab7cb258f
Author: Jungtaek Lim 
Date:   2018-09-06T13:31:08Z

WIP Finished implementing UpdatingSessionIterator

commit 4e8c260a6e6b73b9bcd347ca242b8e77aedf8d1e
Author: Jungtaek Lim 
Date:   2018-09-07T08:35:32Z

WIP add verification on precondition "rows in iterator are sorted by key"

commit 39069ded62dc5836b0b0f7c8ec7fb8ce869e5292
Author: Jungtaek Lim 
Date:   2018-09-08T04:36:46Z

Rename SymmetricHashJoinStateManager to MultiValuesStateManager

* This will be also used from session window state as well

commit c2716340e008000e1fcc5e4d3fcf9befa419ff77
Author: Jungtaek Lim 
Date:   2018-09-08T04:41:37Z

Move package of UpdatingSessionIterator

commit df4cffd5fd1ea82be509f1cd97e5fc3a7ef8acb6
Author: Jungtaek Lim 
Date:   2018-09-10T05:52:28Z

WIP add MergingSortWithMultiValuesStateIterator, now integrating with 
stateful operators (WIP...)

commit 79e32b918c3db41c7d6c1c1d55276d3f696746d5
Author: Jungtaek Lim 
Date:   2018-09-13T06:54:37Z

WIP the first version of working one! Still have lots of TODOs and FIXMEs 
to go

commit fb7aa17488e5753c5460f383e1b0f4bedca6dee8
Author: Jungtaek Lim 
Date:   2018-09-13T08:13:45Z

Add more explanations

commit 9f41b9d6e7960031c52603bd1da9aeca747e1dfb
Author: Jungtaek Lim 
Date:   2018-09-13T08:49:01Z

Silly bugfix & block session window for batch query as of now

We can enable it but there're lots of approaches on aggregations in batch 
side...

* AggUtils.planAggregateWithoutDistinct
* AggUtils.planAggregateWithOneDistinct
* RewriteDistinctAggregates
* AggregateInPandasExec

So unless we are sure which things to support, just block them for now...

commit 0a62b1f0c274859061c0f3ab2c63450052985ac7
Author: Jungtaek Lim 
Date:   2018-09-13T09:28:34Z

More works: majorly split out updating session to individual physical node

* we will leverage such node for batch case if we want

commit acb5a0c42641041ca3adae2c9f2293b4dfa837cf
Author: Jungtaek Lim 
Date:   2018-09-13T09:38:00Z

Fix a silly bug and also add check for session window against batch query

commit 1b6502c92231b7aaa9d0d6f620a5bcc624b862ec
Author: Jungtaek Lim 
Date:   2018-09-13T11:30:15Z

WIP Fixed eviction on update mode

commit fec9a8ae5c1d421322738bd474fcb5508421f51a
Author: Jungtaek Lim 
Date:   2018-09-13T12:48:07Z

WIP found root reason of broken UT... fixed it

commit c87e4eebcc53c81328d52e4d4ea270bcede8b26e
Author: Jungtaek

[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...

2018-09-19 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22138
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-19 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r218955883
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject wi

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-19 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r218777053
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject wi

[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...

2018-09-19 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22138
  
> just wondering why org.apache.spark.sql.kafka010.CachedKafkaProducer uses 
com.google.common.cache.LoadingCache?

Because KafkaProducer is thread-safe unless it enables transaction, hence 
encouraged for multiple tasks to use concurrently. For consumer it is not 
thread-safe so we guarded with custom logic, and this patch proposes to guard 
with Apache Commons Pool.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...

2018-09-17 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22138
  
Now vote for Spark 2.4 is in progress. If we are not in stand-by mode for 
any blocker issues for Spark 2.4 RC, I'd be really happy if someone could 
revisit this and continue reviewing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...

2018-09-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/22138
  
Regarding metrics in FetchedDataPool, I just add basic metrics so that 
tests can leverage on verification. I was adding numActive as well as numIdle, 
but tracking and measuring them needs more resources, hence I'd postpone it 
unless someone asks me to add.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215867141
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -18,222 +18,247 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io.Closeable
 import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
+ * so all the methods should not rely on current cursor and use seek 
manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Closeable with 
Logging {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  private val consumer = createConsumer
 
-private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available.
-   *
-   * If the record is invisible (either a
-   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
-   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
-   * within [offset, untilOffset).
-   *
-   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, 
untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible 
messages or data loss and
-   * reaches `untilOffset`, it will return `null`.
+   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
+   * and "offset after poll". The list of consumer record may be empty if 
the Kafka consumer fetches
+   * some messages but all of them are not visible messages (either 
transaction messages,
+   * or aborted messages when `isolation.level` is `read_committed`).
*
-   * @param offset the offset to fetch.
-   * @param untilOffsetthe max offset to fetch. Exclusive.
-   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
-   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
-   *   this method will either return record at offset 
if available, or return
-   *   the next earliest available record less than 
untilOffset, or null. It
-   *   will not throw any exception.
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
*/
-  def get(
-  offset: Long,
-  untilOffset: Long,
-  pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+  def fetch(offset: Long, pollTimeoutMs: Long)
+  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+// Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
+seek(offset)
+val p = consumer.poll(pollTimeoutMs)
+val r = p.records(topicPartition)
+logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
+val offsetAfterPoll = consumer.position(topicPartition)
+logDebug(s"Offset changed from $offset to $offsetA

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-06 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215818860
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala
 ---
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FetchedPoolSuite extends SharedSQLContext {
+  type Record = ConsumerRecord[Array[Byte], Array[Byte]]
+
+  private val dummyBytes = "dummy".getBytes
+
+  test("acquire fresh one") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+dataPool.release(cacheKey, data)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("acquire fetched data from multiple keys") {
+val dataPool = FetchedDataPool.build
+
+val cacheKeys = (0 to 10).map { partId =>
+  CacheKey("testgroup", new TopicPartition("topic", partId))
+}
+
+assert(dataPool.getCache.size === 0)
+cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) }
+
+val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0)))
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(dataPool.getCache(key).head.inUse)
+}
+
+dataList.map { case (_, data) =>
+  data.withNewPoll(testRecords(0, 5).listIterator, 5)
+}
+
+dataList.foreach { case (key, data) =>
+  dataPool.release(key, data)
+}
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(!dataPool.getCache(key).head.inUse)
+}
+
+dataPool.shutdown()
+  }
+
+  test("continuous use of fetched data from single key") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+(0 to 3).foreach { _ => data.next() }
+
+dataPool.release(cacheKey, data)
+
+// suppose next batch
+
+val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData)
+
+assert(data.eq(data2))
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.release(cacheKey, data2)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("multiple tasks referring same key continuously using fetched 
data") {
+val dataPool = FetchedDataPool.build
+
  

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-06 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215637613
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -18,222 +18,247 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io.Closeable
 import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
+ * so all the methods should not rely on current cursor and use seek 
manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Closeable with 
Logging {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  private val consumer = createConsumer
 
-private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available.
-   *
-   * If the record is invisible (either a
-   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
-   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
-   * within [offset, untilOffset).
-   *
-   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, 
untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible 
messages or data loss and
-   * reaches `untilOffset`, it will return `null`.
+   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
+   * and "offset after poll". The list of consumer record may be empty if 
the Kafka consumer fetches
+   * some messages but all of them are not visible messages (either 
transaction messages,
+   * or aborted messages when `isolation.level` is `read_committed`).
*
-   * @param offset the offset to fetch.
-   * @param untilOffsetthe max offset to fetch. Exclusive.
-   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
-   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
-   *   this method will either return record at offset 
if available, or return
-   *   the next earliest available record less than 
untilOffset, or null. It
-   *   will not throw any exception.
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
*/
-  def get(
-  offset: Long,
-  untilOffset: Long,
-  pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+  def fetch(offset: Long, pollTimeoutMs: Long)
+  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+// Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
+seek(offset)
+val p = consumer.poll(pollTimeoutMs)
+val r = p.records(topicPartition)
+logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
+val offsetAfterPoll = consumer.position(topicPartition)
+logDebug(s"Offset changed from $offset to $offsetA

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-06 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215635068
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject wi

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215313888
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject wi

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215313215
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala
 ---
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FetchedPoolSuite extends SharedSQLContext {
+  type Record = ConsumerRecord[Array[Byte], Array[Byte]]
+
+  private val dummyBytes = "dummy".getBytes
+
+  test("acquire fresh one") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+dataPool.release(cacheKey, data)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("acquire fetched data from multiple keys") {
+val dataPool = FetchedDataPool.build
+
+val cacheKeys = (0 to 10).map { partId =>
+  CacheKey("testgroup", new TopicPartition("topic", partId))
+}
+
+assert(dataPool.getCache.size === 0)
+cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) }
+
+val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0)))
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(dataPool.getCache(key).head.inUse)
+}
+
+dataList.map { case (_, data) =>
+  data.withNewPoll(testRecords(0, 5).listIterator, 5)
+}
+
+dataList.foreach { case (key, data) =>
+  dataPool.release(key, data)
+}
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(!dataPool.getCache(key).head.inUse)
+}
+
+dataPool.shutdown()
+  }
+
+  test("continuous use of fetched data from single key") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+(0 to 3).foreach { _ => data.next() }
+
+dataPool.release(cacheKey, data)
+
+// suppose next batch
+
+val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData)
+
+assert(data.eq(data2))
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.release(cacheKey, data2)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("multiple tasks referring same key continuously using fetched 
data") {
+val dataPool = FetchedDataPool.build
+
  

  1   2   3   4   5   >