Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r235312035 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala --- @@ -74,6 +76,43 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging */ val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false) + /** + * The archive directory to move completed files. The option will be only effective when + * "cleanSource" is set to "archive". + * + * Note that the completed file will be moved to this archive directory with respecting to + * its own path. + * + * For example, if the path of source file is "/a/b/dataset.txt", and the path of archive + * directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt". + */ + val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir") + + /** + * Defines how to clean up completed files. Available options are "archive", "delete", "no_op". + */ + val cleanSource: CleanSourceMode.Value = { + val modeStrOption = parameters.getOrElse("cleanSource", CleanSourceMode.NO_OP.toString) + .toUpperCase(Locale.ROOT) + + val matchedModeOpt = CleanSourceMode.values.find(_.toString == modeStrOption) + if (matchedModeOpt.isEmpty) { --- End diff -- This can be simplified something like: ``` matchedModeOpt match { case None => throw new IllegalArgumentException(s"Invalid mode for clean source option $modeStrOption." + s" Must be one of ${CleanSourceMode.values.mkString(",")}") case Some(matchedMode) => if (matchedMode == CleanSourceMode.ARCHIVE && sourceArchiveDir.isEmpty) { throw new IllegalArgumentException("Archive mode must be used with 'sourceArchiveDir' " + "option.") } matchedMode } ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org