[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237481604
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  try {
+logDebug(s"Creating directory if it doesn't exist 
${newPath.getParent}")
+if (!fs.exists(newPath.getParent)) {
+  fs.mkdirs(newPath.getParent)
+}
+
+logDebug(s"Archiving completed file $curPath to $newPath")
+fs.rename(curPath, newPath)
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being 
stopped
+  logWarning(s"Fail to move $curPath to $newPath / skip moving 
file.", e)
+  }
+}
+
+def remove(entry: FileEntry): Unit = {
+  val curPath = new Path(entry.path)
--- End diff --

I just modified existing UT to have space and % in directory name as well 
as file name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237342362
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
--- End diff --

Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237342346
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -100,6 +101,36 @@ class FileStreamSource(
 
   logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = 
$maxFileAgeMs")
 
+  ensureNoOverlapBetweenSourceAndArchivePath()
+
+  private def ensureNoOverlapBetweenSourceAndArchivePath(): Unit = {
+@tailrec
+def removeGlob(path: Path): Path = {
+  if (path.getName.contains("*")) {
+removeGlob(path.getParent)
+  } else {
+path
+  }
+}
+
+sourceOptions.sourceArchiveDir match {
+  case None =>
+  case Some(archiveDir) =>
+val sourceUri = removeGlob(qualifiedBasePath).toUri
+val archiveUri = new Path(archiveDir).toUri
+
+val sourcePath = sourceUri.getPath
+val archivePath = archiveUri.getPath
--- End diff --

Nice finding. Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237342072
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  try {
+logDebug(s"Creating directory if it doesn't exist 
${newPath.getParent}")
+if (!fs.exists(newPath.getParent)) {
+  fs.mkdirs(newPath.getParent)
+}
+
+logDebug(s"Archiving completed file $curPath to $newPath")
+fs.rename(curPath, newPath)
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being 
stopped
+  logWarning(s"Fail to move $curPath to $newPath / skip moving 
file.", e)
+  }
+}
+
+def remove(entry: FileEntry): Unit = {
+  val curPath = new Path(entry.path)
--- End diff --

Yeah... actually I was somewhat confused I have to escape/unescape for 
path. Thanks for suggestion. Will address and add a new unit test for testing 
it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237341854
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  try {
+logDebug(s"Creating directory if it doesn't exist 
${newPath.getParent}")
+if (!fs.exists(newPath.getParent)) {
+  fs.mkdirs(newPath.getParent)
+}
+
+logDebug(s"Archiving completed file $curPath to $newPath")
+fs.rename(curPath, newPath)
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being 
stopped
+  logWarning(s"Fail to move $curPath to $newPath / skip moving 
file.", e)
+  }
+}
+
+def remove(entry: FileEntry): Unit = {
+  val curPath = new Path(entry.path)
+  try {
+logDebug(s"Removing completed file $curPath")
+fs.delete(curPath, false)
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being 
stopped
+  logWarning(s"Fail to remove $curPath / skip removing file.", e)
+  }
+}
+
+val logOffset = FileStreamSourceOffset(end).logOffset
+metadataLog.get(logOffset) match {
--- End diff --

Ah I didn't indicate that. Thanks for letting me know! Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237341425
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  try {
+logDebug(s"Creating directory if it doesn't exist 
${newPath.getParent}")
+if (!fs.exists(newPath.getParent)) {
+  fs.mkdirs(newPath.getParent)
+}
+
+logDebug(s"Archiving completed file $curPath to $newPath")
+fs.rename(curPath, newPath)
--- End diff --

Yeah, I guess the patch prevents the case if it works like my expectation, 
but I'm also in favor of defensive programming and logging would be better for 
end users. Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237340952
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
 ---
@@ -74,6 +76,39 @@ class FileStreamOptions(parameters: 
CaseInsensitiveMap[String]) extends Logging
*/
   val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
 
+  /**
+   * The archive directory to move completed files. The option will be 
only effective when
+   * "cleanSource" is set to "archive".
+   *
+   * Note that the completed file will be moved to this archive directory 
with respecting to
+   * its own path.
+   *
+   * For example, if the path of source file is "/a/b/dataset.txt", and 
the path of archive
+   * directory is "/archived/here", file will be moved to 
"/archived/here/a/b/dataset.txt".
+   */
+  val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir")
+
+  /**
+   * Defines how to clean up completed files. Available options are 
"archive", "delete", "no_op".
+   */
+  val cleanSource: CleanSourceMode.Value = {
+val modeStrOption = parameters.getOrElse("cleanSource", 
CleanSourceMode.NO_OP.toString)
--- End diff --

OK will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237340938
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -100,6 +101,36 @@ class FileStreamSource(
 
   logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = 
$maxFileAgeMs")
 
+  ensureNoOverlapBetweenSourceAndArchivePath()
--- End diff --

Ah yes missed it. Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237340601
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -530,6 +530,12 @@ Here are the details of all the sources in Spark.
 "s3://a/dataset.txt"
 "s3n://a/b/dataset.txt"
 "s3a://a/b/c/dataset.txt"
+cleanSource: option to clean up completed files after 
processing.
+Available options are "archive", "delete", "no_op". If the option 
is not provided, the default value is "no_op".
+When "archive" is provided, additional option 
sourceArchiveDir must be provided as well. The value of 
"sourceArchiveDir" must be outside of source path, to ensure archived files are 
never included to new source files again.
+Spark will move source files respecting its own path. For example, 
if the path of source file is "/a/b/dataset.txt" and the path of archive 
directory is "/archived/here", file will be moved to 
"/archived/here/a/b/dataset.txt"
+NOTE: Both archiving (via moving) or deleting completed files 
would introduce overhead (slow down) in each micro-batch, so you need to 
understand the cost for each operation in your file system before enabling this 
option. On the other hand, enbling this option would reduce the cost to list 
source files which is considered as a heavy operation.
+NOTE 2: The source path should not be used from multiple queries 
when enabling this option, because source files will be moved or deleted which 
behavior may impact the other queries.
--- End diff --

Nice finding. I missed the case which multiple sources in same query refer 
same file directory. Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237319928
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237314690
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -530,6 +530,12 @@ Here are the details of all the sources in Spark.
 "s3://a/dataset.txt"
 "s3n://a/b/dataset.txt"
 "s3a://a/b/c/dataset.txt"
+cleanSource: option to clean up completed files after 
processing.
+Available options are "archive", "delete", "no_op". If the option 
is not provided, the default value is "no_op".
+When "archive" is provided, additional option 
sourceArchiveDir must be provided as well. The value of 
"sourceArchiveDir" must be outside of source path, to ensure archived files are 
never included to new source files again.
+Spark will move source files respecting its own path. For example, 
if the path of source file is "/a/b/dataset.txt" and the path of archive 
directory is "/archived/here", file will be moved to 
"/archived/here/a/b/dataset.txt"
+NOTE: Both archiving (via moving) or deleting completed files 
would introduce overhead (slow down) in each micro-batch, so you need to 
understand the cost for each operation in your file system before enabling this 
option. On the other hand, enbling this option would reduce the cost to list 
source files which is considered as a heavy operation.
+NOTE 2: The source path should not be used from multiple queries 
when enabling this option, because source files will be moved or deleted which 
behavior may impact the other queries.
--- End diff --

NOTE 3: Both delete and move actions are best effort. Failing to delete or 
move files will not fail the streaming query.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237319903
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  try {
+logDebug(s"Creating directory if it doesn't exist 
${newPath.getParent}")
+if (!fs.exists(newPath.getParent)) {
+  fs.mkdirs(newPath.getParent)
+}
+
+logDebug(s"Archiving completed file $curPath to $newPath")
+fs.rename(curPath, newPath)
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being 
stopped
+  logWarning(s"Fail to move $curPath to $newPath / skip moving 
file.", e)
+  }
+}
+
+def remove(entry: FileEntry): Unit = {
+  val curPath = new Path(entry.path)
--- End diff --

`val curPath = new Path(new URI(entry.path))` to make it escape/unescape 
path properly. `entry.path` was created from `Path.toUri.toString`. Could you 
also add a unit test to test special paths such as `/a/b/a b%.txt`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237319176
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  try {
+logDebug(s"Creating directory if it doesn't exist 
${newPath.getParent}")
+if (!fs.exists(newPath.getParent)) {
+  fs.mkdirs(newPath.getParent)
+}
+
+logDebug(s"Archiving completed file $curPath to $newPath")
+fs.rename(curPath, newPath)
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being 
stopped
+  logWarning(s"Fail to move $curPath to $newPath / skip moving 
file.", e)
+  }
+}
+
+def remove(entry: FileEntry): Unit = {
+  val curPath = new Path(entry.path)
+  try {
+logDebug(s"Removing completed file $curPath")
+fs.delete(curPath, false)
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being 
stopped
+  logWarning(s"Fail to remove $curPath / skip removing file.", e)
+  }
+}
+
+val logOffset = FileStreamSourceOffset(end).logOffset
+metadataLog.get(logOffset) match {
--- End diff --

you can use `val files = metadataLog.get(Some(logOffset), 
Some(logOffset)).flatMap(_._2)` to use the underlying cache in 
FileStreamSourceLog.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237200636
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
 ---
@@ -74,6 +76,39 @@ class FileStreamOptions(parameters: 
CaseInsensitiveMap[String]) extends Logging
*/
   val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
 
+  /**
+   * The archive directory to move completed files. The option will be 
only effective when
+   * "cleanSource" is set to "archive".
+   *
+   * Note that the completed file will be moved to this archive directory 
with respecting to
+   * its own path.
+   *
+   * For example, if the path of source file is "/a/b/dataset.txt", and 
the path of archive
+   * directory is "/archived/here", file will be moved to 
"/archived/here/a/b/dataset.txt".
+   */
+  val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir")
+
+  /**
+   * Defines how to clean up completed files. Available options are 
"archive", "delete", "no_op".
+   */
+  val cleanSource: CleanSourceMode.Value = {
+val modeStrOption = parameters.getOrElse("cleanSource", 
CleanSourceMode.NO_OP.toString)
--- End diff --

nit: could you create a method to `CleanSourceMode` to convert a string to 
`CleanSourceMode.Value`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237315173
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -100,6 +101,36 @@ class FileStreamSource(
 
   logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = 
$maxFileAgeMs")
 
+  ensureNoOverlapBetweenSourceAndArchivePath()
--- End diff --

Could you do this check only when CleanSourceMode is `ARCHIVE`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237315718
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  try {
+logDebug(s"Creating directory if it doesn't exist 
${newPath.getParent}")
+if (!fs.exists(newPath.getParent)) {
+  fs.mkdirs(newPath.getParent)
+}
+
+logDebug(s"Archiving completed file $curPath to $newPath")
+fs.rename(curPath, newPath)
--- End diff --

It's better to also check the return value of `rename`. A user may reuse a 
source archive dir and cause path conflicts. We should also log this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237320515
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -100,6 +101,36 @@ class FileStreamSource(
 
   logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = 
$maxFileAgeMs")
 
+  ensureNoOverlapBetweenSourceAndArchivePath()
+
+  private def ensureNoOverlapBetweenSourceAndArchivePath(): Unit = {
+@tailrec
+def removeGlob(path: Path): Path = {
+  if (path.getName.contains("*")) {
+removeGlob(path.getParent)
+  } else {
+path
+  }
+}
+
+sourceOptions.sourceArchiveDir match {
+  case None =>
+  case Some(archiveDir) =>
+val sourceUri = removeGlob(qualifiedBasePath).toUri
+val archiveUri = new Path(archiveDir).toUri
+
+val sourcePath = sourceUri.getPath
+val archivePath = archiveUri.getPath
--- End diff --

we need to use `fs.makeQualified` to turn all user provided paths to 
absolute paths as the user may just pass a relative path.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237314459
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -530,6 +530,12 @@ Here are the details of all the sources in Spark.
 "s3://a/dataset.txt"
 "s3n://a/b/dataset.txt"
 "s3a://a/b/c/dataset.txt"
+cleanSource: option to clean up completed files after 
processing.
+Available options are "archive", "delete", "no_op". If the option 
is not provided, the default value is "no_op".
+When "archive" is provided, additional option 
sourceArchiveDir must be provided as well. The value of 
"sourceArchiveDir" must be outside of source path, to ensure archived files are 
never included to new source files again.
+Spark will move source files respecting its own path. For example, 
if the path of source file is "/a/b/dataset.txt" and the path of archive 
directory is "/archived/here", file will be moved to 
"/archived/here/a/b/dataset.txt"
+NOTE: Both archiving (via moving) or deleting completed files 
would introduce overhead (slow down) in each micro-batch, so you need to 
understand the cost for each operation in your file system before enabling this 
option. On the other hand, enbling this option would reduce the cost to list 
source files which is considered as a heavy operation.
+NOTE 2: The source path should not be used from multiple queries 
when enabling this option, because source files will be moved or deleted which 
behavior may impact the other queries.
--- End diff --

NOTE 2: The source path should not be used from multiple **sources or** 
queries when enabling this option, because source files will be moved or 
deleted which behavior may impact the other **sources and** queries.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r235632761
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -530,6 +530,12 @@ Here are the details of all the sources in Spark.
 "s3://a/dataset.txt"
 "s3n://a/b/dataset.txt"
 "s3a://a/b/c/dataset.txt"
+cleanSource: option to clean up completed files after 
processing.
+Available options are "archive", "delete", "no_op". If the option 
is not provided, the default value is "no_op".
+When "archive" is provided, additional option 
sourceArchiveDir must be provided as well. The value of 
"sourceArchiveDir" must be outside of source path, to ensure archived files are 
never included to new source files again.
--- End diff --

Yeah I guess you're right. I'll add a logic to check in initialization on 
FileStreamSource.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r235632872
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
 ---
@@ -74,6 +76,43 @@ class FileStreamOptions(parameters: 
CaseInsensitiveMap[String]) extends Logging
*/
   val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
 
+  /**
+   * The archive directory to move completed files. The option will be 
only effective when
+   * "cleanSource" is set to "archive".
+   *
+   * Note that the completed file will be moved to this archive directory 
with respecting to
+   * its own path.
+   *
+   * For example, if the path of source file is "/a/b/dataset.txt", and 
the path of archive
+   * directory is "/archived/here", file will be moved to 
"/archived/here/a/b/dataset.txt".
+   */
+  val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir")
+
+  /**
+   * Defines how to clean up completed files. Available options are 
"archive", "delete", "no_op".
+   */
+  val cleanSource: CleanSourceMode.Value = {
+val modeStrOption = parameters.getOrElse("cleanSource", 
CleanSourceMode.NO_OP.toString)
+  .toUpperCase(Locale.ROOT)
+
+val matchedModeOpt = CleanSourceMode.values.find(_.toString == 
modeStrOption)
+if (matchedModeOpt.isEmpty) {
--- End diff --

Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r235632809
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +258,64 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  if (!fs.exists(newPath.getParent)) {
--- End diff --

Nice finding. Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-21 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r235314493
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +258,64 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  if (!fs.exists(newPath.getParent)) {
--- End diff --

These fs operation can also throw exception. Why not covered these as well 
with try?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-21 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r235312035
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
 ---
@@ -74,6 +76,43 @@ class FileStreamOptions(parameters: 
CaseInsensitiveMap[String]) extends Logging
*/
   val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
 
+  /**
+   * The archive directory to move completed files. The option will be 
only effective when
+   * "cleanSource" is set to "archive".
+   *
+   * Note that the completed file will be moved to this archive directory 
with respecting to
+   * its own path.
+   *
+   * For example, if the path of source file is "/a/b/dataset.txt", and 
the path of archive
+   * directory is "/archived/here", file will be moved to 
"/archived/here/a/b/dataset.txt".
+   */
+  val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir")
+
+  /**
+   * Defines how to clean up completed files. Available options are 
"archive", "delete", "no_op".
+   */
+  val cleanSource: CleanSourceMode.Value = {
+val modeStrOption = parameters.getOrElse("cleanSource", 
CleanSourceMode.NO_OP.toString)
+  .toUpperCase(Locale.ROOT)
+
+val matchedModeOpt = CleanSourceMode.values.find(_.toString == 
modeStrOption)
+if (matchedModeOpt.isEmpty) {
--- End diff --

This can be simplified something like:
```
matchedModeOpt match {
  case None =>
throw new IllegalArgumentException(s"Invalid mode for clean source 
option $modeStrOption." +
  s" Must be one of ${CleanSourceMode.values.mkString(",")}")
  case Some(matchedMode) =>
if (matchedMode == CleanSourceMode.ARCHIVE && 
sourceArchiveDir.isEmpty) {
  throw new IllegalArgumentException("Archive mode must be used 
with 'sourceArchiveDir' " +
"option.")
}
matchedMode
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org