[GitHub] spark pull request #17012: [SPARK-19677][SS] Renaming a file atop an existin...

2017-02-27 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17012#discussion_r103361529
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 ---
@@ -682,6 +684,21 @@ private[state] object StateStoreSuite {
 }
 
 /**
+  * Fake FileSystem that simulates HDFS rename semantic, i.e. renaming a 
file atop an existing
--- End diff --

nit: indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17012: [SPARK-19677][SS] Renaming a file atop an existin...

2017-02-27 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17012#discussion_r103360081
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 ---
@@ -292,6 +285,15 @@ class StateStoreSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateMeth
 assert(getDataFromFiles(provider, 19) === Set("a" -> 19))
   }
 
+  test("existing file handling") {
--- End diff --

nit: `SPARK-19677: Renaming a file atop an existing one on HDFS`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17012: [SPARK-19677][SS] Renaming a file atop an existin...

2017-02-27 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17012#discussion_r103359940
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -274,7 +274,9 @@ private[state] class HDFSBackedStateStoreProvider(
   private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: 
Path): Path = {
 synchronized {
   val finalDeltaFile = deltaFile(newVersion)
-  if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
+  // Renaming a file atop an existing one fails on HDFS, see
--- End diff --

Could you add our discussion to the comment? Such as
```
  // scalastyle:off
  // Renaming a file atop an existing one fails on HDFS
  // 
(http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html).
  // Hence we should either skip the rename step or delete the target 
file. Because deleting the
  // target file will beak speculation, skipping the rename step is the 
only choice. It's still
  // semantically correct because Structured Streaming requires 
rerunning a batch should
  // generate the same output. (SPARK-19677)
  // scalastyle:on
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17012: [SPARK-19677][SS] Renaming a file atop an existin...

2017-02-27 Thread vitillo
Github user vitillo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17012#discussion_r103219157
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -274,6 +274,11 @@ private[state] class HDFSBackedStateStoreProvider(
   private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: 
Path): Path = {
 synchronized {
   val finalDeltaFile = deltaFile(newVersion)
+  // Renaming a file atop an existing one fails on HDFS, see
+  // 
hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html
+  if (fs.exists(finalDeltaFile)) {
+fs.delete(finalDeltaFile, false)
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17012: [SPARK-19677][SS] Renaming a file atop an existin...

2017-02-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17012#discussion_r103059294
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -274,6 +274,11 @@ private[state] class HDFSBackedStateStoreProvider(
   private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: 
Path): Path = {
 synchronized {
   val finalDeltaFile = deltaFile(newVersion)
+  // Renaming a file atop an existing one fails on HDFS, see
+  // 
hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html
+  if (fs.exists(finalDeltaFile)) {
+fs.delete(finalDeltaFile, false)
--- End diff --

Actually, I think we should not delete the file. Otherwise, it will break 
speculation. E.g., there is also a speculation ask running. It wrote 
successfully and just reported to the driver that the task was successful. At 
the same time, another task just deleted the final file wrongly because the 
file exists.

The correct way is just skipping `rename`. We can depend on the basic 
assumption on Structured Streaming that the same batch should contain the same 
data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17012: [SPARK-19677][SS] Renaming a file atop an existin...

2017-02-21 Thread vitillo
GitHub user vitillo opened a pull request:

https://github.com/apache/spark/pull/17012

[SPARK-19677][SS] Renaming a file atop an existing one should not fail on 
HDFS

## What changes were proposed in this pull request?

HDFSBackedStateStoreProvider fails to rename files on HDFS but not on the 
local filesystem. According to the [implementation 
notes](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html)
 of `rename()`, the behavior of the local filesystem and HDFS varies:

> Destination exists and is a file
> Renaming a file atop an existing file is specified as failing, raising an 
exception.
>- Local FileSystem : the rename succeeds; the destination file is 
replaced by the source file.
>- HDFS : The rename fails, no exception is raised. Instead the method 
call simply returns false.

This patch ensures that the destination of `rename()` is overwritten also 
in HDFS.

## How was this patch tested?

This patch was tested by running `StateStoreSuite`.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vitillo/spark fix_rename

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17012.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17012


commit c985f39ad56e39943cff2341036e36919d8d063e
Author: Roberto Agostino Vitillo 
Date:   2017-02-21T07:34:28Z

Renaming a file atop an existing one should not fail on HDFS




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org