[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r79747197 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala --- @@ -79,213 +76,46 @@ object SinkFileStatus { * When the reader uses `allFiles` to list all files, this method only returns the visible files * (drops the deleted files). */ -class FileStreamSinkLog(sparkSession: SparkSession, path: String) - extends HDFSMetadataLog[Array[SinkFileStatus]](sparkSession, path) { - - import FileStreamSinkLog._ +class FileStreamSinkLog( +metadataLogVersion: String, +sparkSession: SparkSession, +path: String) + extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) { private implicit val formats = Serialization.formats(NoTypeHints) - /** - * If we delete the old files after compaction at once, there is a race condition in S3: other - * processes may see the old files are deleted but still cannot see the compaction file using - * "list". The `allFiles` handles this by looking for the next compaction file directly, however, - * a live lock may happen if the compaction happens too frequently: one processing keeps deleting - * old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it. - */ - private val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay + protected override val fileCleanupDelayMs = --- End diff -- Oh, sorry about it, will fix it now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r79668559 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala --- @@ -79,213 +76,46 @@ object SinkFileStatus { * When the reader uses `allFiles` to list all files, this method only returns the visible files * (drops the deleted files). */ -class FileStreamSinkLog(sparkSession: SparkSession, path: String) - extends HDFSMetadataLog[Array[SinkFileStatus]](sparkSession, path) { - - import FileStreamSinkLog._ +class FileStreamSinkLog( +metadataLogVersion: String, +sparkSession: SparkSession, +path: String) + extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) { private implicit val formats = Serialization.formats(NoTypeHints) - /** - * If we delete the old files after compaction at once, there is a race condition in S3: other - * processes may see the old files are deleted but still cannot see the compaction file using - * "list". The `allFiles` handles this by looking for the next compaction file directly, however, - * a live lock may happen if the compaction happens too frequently: one processing keeps deleting - * old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it. - */ - private val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay + protected override val fileCleanupDelayMs = --- End diff -- I just noticed some conflicts here. Could you submit a follow up PR to use the previous `sparkSession.sessionState.conf.fileSinkLogCleanupDelay`? Same as the other confs. This only exists in master branch, so we don't need to fix branch 2.0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/13513 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r79530152 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.{LinkedHashMap => JLinkedHashMap} +import java.util.Map.Entry + +import scala.collection.mutable + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry +import org.apache.spark.sql.internal.SQLConf + +class FileStreamSourceLog( +metadataLogVersion: String, +sparkSession: SparkSession, +path: String) + extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) { + + import CompactibleFileStreamLog._ + + // Configurations about metadata compaction + protected override val compactInterval = + sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, +s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") + + protected override val fileCleanupDelayMs = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + + protected override val isDeletingExpiredLog = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + + private implicit val formats = Serialization.formats(NoTypeHints) + + // A fixed size log entry cache to cache the file entries belong to the compaction batch. It is + // used to avoid scanning the compacted log file to retrieve it's own batch data. + private val cacheSize = compactInterval + private val fileEntryCache = new JLinkedHashMap[Long, Array[FileEntry]] { +override def removeEldestEntry(eldest: Entry[Long, Array[FileEntry]]): Boolean = { + size() > cacheSize +} + } + + protected override def serializeData(data: FileEntry): String = { +Serialization.write(data) + } + + protected override def deserializeData(encodedString: String): FileEntry = { +Serialization.read[FileEntry](encodedString) + } + + def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = { +logs + } + + override def add(batchId: Long, logs: Array[FileEntry]): Boolean = { +if (super.add(batchId, logs) && isCompactionBatch(batchId, compactInterval)) { --- End diff -- yes, you're right, I will fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r79518675 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry +import org.apache.spark.sql.internal.SQLConf + +class FileStreamSourceLog( +metadataLogVersion: String, +sparkSession: SparkSession, +path: String) + extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) { + + import CompactibleFileStreamLog._ + + // Configurations about metadata compaction + protected override val compactInterval = + sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, +s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") + + protected override val fileCleanupDelayMs = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + + protected override val isDeletingExpiredLog = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + + private implicit val formats = Serialization.formats(NoTypeHints) + + // A fixed size log cache to cache the file entries belong to the compaction batch. It is used + // to avoid scanning the compacted log file to retrieve it's own batch data. + private val cacheSize = compactInterval + private val fileEntryCache = new mutable.LinkedHashMap[Long, Array[FileEntry]] + + private def updateCache(batchId: Long, logs: Array[FileEntry]): Unit = { +if (fileEntryCache.size >= cacheSize) { + fileEntryCache.drop(1) +} + +fileEntryCache.put(batchId, logs) + } + + protected override def serializeData(data: FileEntry): String = { +Serialization.write(data) + } + + protected override def deserializeData(encodedString: String): FileEntry = { +Serialization.read[FileEntry](encodedString) + } + + def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = { +logs + } + + override def add(batchId: Long, logs: Array[FileEntry]): Boolean = { +if (super.add(batchId, logs) && isCompactionBatch(batchId, compactInterval)) { + updateCache(batchId, logs) + true +} else if (!isCompactionBatch(batchId, compactInterval)) { + true +} else { + false +} + } + + override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, Array[FileEntry])] = { +val startBatchId = startId.getOrElse(0L) +val endBatchId = getLatest().map(_._1).getOrElse(0L) + +val (existedBatches, removedBatches) = (startBatchId to endBatchId).map { id => + if (isCompactionBatch(id, compactInterval) && fileEntryCache.contains(id)) { +(id, Some(fileEntryCache(id))) + } else { +val logs = super.get(id).map(_.filter(_.batchId == id)) +(id, logs) + } +}.partition(_._2.isDefined) + +// The below code may only be happened when original metadata log file has been removed, so we +// have to get the batch from latest compacted log file. This is quite time-consuming and may +// not be happened in the current FileStreamSource code path, since we only fetch the +// latest metadata log file. +val searchKeys = removedBatches.map(_._1) +val retrievedBatches = if (searchKeys.nonEmpty) { + logWarning(s"Get batches from removed files, this is unexpected in the current code path!!!") + val latestBatchId =
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r79518445 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.{LinkedHashMap => JLinkedHashMap} +import java.util.Map.Entry + +import scala.collection.mutable + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry +import org.apache.spark.sql.internal.SQLConf + +class FileStreamSourceLog( +metadataLogVersion: String, +sparkSession: SparkSession, +path: String) + extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) { + + import CompactibleFileStreamLog._ + + // Configurations about metadata compaction + protected override val compactInterval = + sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, +s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") + + protected override val fileCleanupDelayMs = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + + protected override val isDeletingExpiredLog = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + + private implicit val formats = Serialization.formats(NoTypeHints) + + // A fixed size log entry cache to cache the file entries belong to the compaction batch. It is + // used to avoid scanning the compacted log file to retrieve it's own batch data. + private val cacheSize = compactInterval + private val fileEntryCache = new JLinkedHashMap[Long, Array[FileEntry]] { +override def removeEldestEntry(eldest: Entry[Long, Array[FileEntry]]): Boolean = { + size() > cacheSize +} + } + + protected override def serializeData(data: FileEntry): String = { +Serialization.write(data) + } + + protected override def deserializeData(encodedString: String): FileEntry = { +Serialization.read[FileEntry](encodedString) + } + + def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = { +logs + } + + override def add(batchId: Long, logs: Array[FileEntry]): Boolean = { +if (super.add(batchId, logs) && isCompactionBatch(batchId, compactInterval)) { --- End diff -- This is wrong. If `super.add(batchId, logs)` is false, then we should always return `false`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r79099764 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry +import org.apache.spark.sql.internal.SQLConf + +class FileStreamSourceLog( +metadataLogVersion: String, +sparkSession: SparkSession, +path: String) + extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) { + + import CompactibleFileStreamLog._ + + // Configurations about metadata compaction + protected override val compactInterval = + sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, +s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") + + protected override val fileCleanupDelayMs = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + + protected override val isDeletingExpiredLog = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + + private implicit val formats = Serialization.formats(NoTypeHints) + + // A fixed size log cache to cache the file entries belong to the compaction batch. It is used + // to avoid scanning the compacted log file to retrieve it's own batch data. + private val cacheSize = compactInterval + private val fileEntryCache = new mutable.LinkedHashMap[Long, Array[FileEntry]] + + private def updateCache(batchId: Long, logs: Array[FileEntry]): Unit = { +if (fileEntryCache.size >= cacheSize) { + fileEntryCache.drop(1) +} + +fileEntryCache.put(batchId, logs) + } + + protected override def serializeData(data: FileEntry): String = { +Serialization.write(data) + } + + protected override def deserializeData(encodedString: String): FileEntry = { +Serialization.read[FileEntry](encodedString) + } + + def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = { +logs + } + + override def add(batchId: Long, logs: Array[FileEntry]): Boolean = { +if (super.add(batchId, logs) && isCompactionBatch(batchId, compactInterval)) { + updateCache(batchId, logs) + true +} else if (!isCompactionBatch(batchId, compactInterval)) { + true +} else { + false +} + } + + override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, Array[FileEntry])] = { +val startBatchId = startId.getOrElse(0L) +val endBatchId = getLatest().map(_._1).getOrElse(0L) + +val (existedBatches, removedBatches) = (startBatchId to endBatchId).map { id => + if (isCompactionBatch(id, compactInterval) && fileEntryCache.contains(id)) { +(id, Some(fileEntryCache(id))) + } else { +val logs = super.get(id).map(_.filter(_.batchId == id)) +(id, logs) + } +}.partition(_._2.isDefined) + +// The below code may only be happened when original metadata log file has been removed, so we +// have to get the batch from latest compacted log file. This is quite time-consuming and may +// not be happened in the current FileStreamSource code path, since we only fetch the +// latest metadata log file. +val searchKeys = removedBatches.map(_._1) +val retrievedBatches = if (searchKeys.nonEmpty) { + logWarning(s"Get batches from removed files, this is unexpected in the current code path!!!") + val latestBatchId =
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r79093102 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry +import org.apache.spark.sql.internal.SQLConf + +class FileStreamSourceLog( +metadataLogVersion: String, +sparkSession: SparkSession, +path: String) + extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) { + + import CompactibleFileStreamLog._ + + // Configurations about metadata compaction + protected override val compactInterval = + sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, +s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") + + protected override val fileCleanupDelayMs = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + + protected override val isDeletingExpiredLog = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + + private implicit val formats = Serialization.formats(NoTypeHints) + + // A fixed size log cache to cache the file entries belong to the compaction batch. It is used + // to avoid scanning the compacted log file to retrieve it's own batch data. + private val cacheSize = compactInterval + private val fileEntryCache = new mutable.LinkedHashMap[Long, Array[FileEntry]] + + private def updateCache(batchId: Long, logs: Array[FileEntry]): Unit = { +if (fileEntryCache.size >= cacheSize) { + fileEntryCache.drop(1) --- End diff -- I see, sorry for this issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r79023964 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry +import org.apache.spark.sql.internal.SQLConf + +class FileStreamSourceLog( +metadataLogVersion: String, +sparkSession: SparkSession, +path: String) + extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) { + + import CompactibleFileStreamLog._ + + // Configurations about metadata compaction + protected override val compactInterval = + sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, +s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") + + protected override val fileCleanupDelayMs = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + + protected override val isDeletingExpiredLog = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + + private implicit val formats = Serialization.formats(NoTypeHints) + + // A fixed size log cache to cache the file entries belong to the compaction batch. It is used + // to avoid scanning the compacted log file to retrieve it's own batch data. + private val cacheSize = compactInterval + private val fileEntryCache = new mutable.LinkedHashMap[Long, Array[FileEntry]] + + private def updateCache(batchId: Long, logs: Array[FileEntry]): Unit = { +if (fileEntryCache.size >= cacheSize) { + fileEntryCache.drop(1) --- End diff -- `drop` doesn't change the original map. ```Scala scala> val m = scala.collection.mutable.LinkedHashMap[Int, Int]() m: scala.collection.mutable.LinkedHashMap[Int,Int] = Map() scala> scala> m(2) = 1 scala> m res1: scala.collection.mutable.LinkedHashMap[Int,Int] = Map(2 -> 1) scala> m.drop(1) res2: scala.collection.mutable.LinkedHashMap[Int,Int] = Map() scala> m res3: scala.collection.mutable.LinkedHashMap[Int,Int] = Map(2 -> 1) ``` I think it should be Java LinkedHashMap. This is an example: https://github.com/apache/spark/blob/03d46aafe561b03e25f4e25cf01e631c18dd827c/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#L45 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r79027018 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry +import org.apache.spark.sql.internal.SQLConf + +class FileStreamSourceLog( +metadataLogVersion: String, +sparkSession: SparkSession, +path: String) + extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) { + + import CompactibleFileStreamLog._ + + // Configurations about metadata compaction + protected override val compactInterval = + sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, +s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") + + protected override val fileCleanupDelayMs = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + + protected override val isDeletingExpiredLog = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + + private implicit val formats = Serialization.formats(NoTypeHints) + + // A fixed size log cache to cache the file entries belong to the compaction batch. It is used + // to avoid scanning the compacted log file to retrieve it's own batch data. + private val cacheSize = compactInterval + private val fileEntryCache = new mutable.LinkedHashMap[Long, Array[FileEntry]] + + private def updateCache(batchId: Long, logs: Array[FileEntry]): Unit = { +if (fileEntryCache.size >= cacheSize) { + fileEntryCache.drop(1) +} + +fileEntryCache.put(batchId, logs) + } + + protected override def serializeData(data: FileEntry): String = { +Serialization.write(data) + } + + protected override def deserializeData(encodedString: String): FileEntry = { +Serialization.read[FileEntry](encodedString) + } + + def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = { +logs + } + + override def add(batchId: Long, logs: Array[FileEntry]): Boolean = { +if (super.add(batchId, logs) && isCompactionBatch(batchId, compactInterval)) { + updateCache(batchId, logs) + true +} else if (!isCompactionBatch(batchId, compactInterval)) { + true +} else { + false +} + } + + override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, Array[FileEntry])] = { +val startBatchId = startId.getOrElse(0L) +val endBatchId = getLatest().map(_._1).getOrElse(0L) + +val (existedBatches, removedBatches) = (startBatchId to endBatchId).map { id => + if (isCompactionBatch(id, compactInterval) && fileEntryCache.contains(id)) { +(id, Some(fileEntryCache(id))) + } else { +val logs = super.get(id).map(_.filter(_.batchId == id)) +(id, logs) + } +}.partition(_._2.isDefined) + +// The below code may only be happened when original metadata log file has been removed, so we +// have to get the batch from latest compacted log file. This is quite time-consuming and may +// not be happened in the current FileStreamSource code path, since we only fetch the +// latest metadata log file. +val searchKeys = removedBatches.map(_._1) +val retrievedBatches = if (searchKeys.nonEmpty) { + logWarning(s"Get batches from removed files, this is unexpected in the current code path!!!") + val latestBatchId =
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r78630727 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala --- @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.IOException +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.concurrent.TimeUnit + +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.{Path, PathFilter} + +import org.apache.spark.sql.SparkSession + +/** + * An abstract class for compactible metadata logs. It will write one log file for each batch. + * The first line of the log file is the version number, and there are multiple JSON lines + * following. + * + * As reading from many small files is usually pretty slow, also too many + * small files in one folder will mess the FS, [[CompactibleFileStreamLog]] will + * compact log files every 10 batches by default into a big file. When + * doing a compaction, it will read all old log files and merge them with the new batch. + */ +abstract class CompactibleFileStreamLog[T: ClassTag]( +sparkSession: SparkSession, +path: String) + extends HDFSMetadataLog[Array[T]](sparkSession, path) { + + import CompactibleFileStreamLog._ + + /** + * If we delete the old files after compaction at once, there is a race condition in S3: other + * processes may see the old files are deleted but still cannot see the compaction file using + * "list". The `allFiles` handles this by looking for the next compaction file directly, however, + * a live lock may happen if the compaction happens too frequently: one processing keeps deleting + * old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it. + */ + protected val fileCleanupDelayMs = TimeUnit.MINUTES.toMillis(10) + + protected val isDeletingExpiredLog = true + + protected val compactInterval = 10 + + /** + * Serialize the data into encoded string. + */ + protected def serializeData(t: T): String + + /** + * Deserialize the string into data object. + */ + protected def deserializeData(encodedString: String): T + + /** + * Filter out the unwanted logs, by default it filters out nothing, inherited class could + * override this method to do filtering. + */ + protected def compactLogs(oldLogs: Seq[T], newLogs: Seq[T]): Seq[T] = { +oldLogs ++ newLogs + } + + override def batchIdToPath(batchId: Long): Path = { +if (isCompactionBatch(batchId, compactInterval)) { + new Path(metadataPath, s"$batchId$COMPACT_FILE_SUFFIX") +} else { + new Path(metadataPath, batchId.toString) +} + } + + override def pathToBatchId(path: Path): Long = { +getBatchIdFromFileName(path.getName) + } + + override def isBatchFile(path: Path): Boolean = { +try { + getBatchIdFromFileName(path.getName) + true +} catch { + case _: NumberFormatException => false +} + } + + override def serialize(logData: Array[T]): Array[Byte] = { +(VERSION +: logData.map(serializeData)).mkString("\n").getBytes(UTF_8) --- End diff -- Could you make VERSION be a constructor parameter in order to support to change source or sink format separately? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r76499068 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -129,3 +131,86 @@ class FileStreamSource( override def toString: String = s"FileStreamSource[$qualifiedBasePath]" } + +class FileStreamSourceLog(sparkSession: SparkSession, path: String) + extends HDFSMetadataLog[Seq[String]](sparkSession, path) { + + // Configurations about metadata compaction + private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, +s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") + + private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + + private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + + private var compactBatchId: Long = -1L + + private def isCompactionBatch(batchId: Long, compactInterval: Long): Boolean = { +batchId % compactInterval == 0 + } + + override def add(batchId: Long, metadata: Seq[String]): Boolean = { +if (isCompactionBatch(batchId, compactInterval)) { + compactMetadataLog(batchId - 1) +} + +super.add(batchId, metadata) + } + + private def compactMetadataLog(batchId: Long): Unit = { +// read out compact metadata and merge with new metadata. +val batches = super.get(Some(compactBatchId), Some(batchId)) +val totalMetadata = batches.flatMap(_._2) +if (totalMetadata.isEmpty) { + return +} + +// Remove old compact metadata file and rewrite. +val renamedPath = new Path(path, s".${batchId.toString}-${UUID.randomUUID.toString}.tmp") +fileManager.rename(batchIdToPath(batchId), renamedPath) + +var isSuccess = false +try { + isSuccess = super.add(batchId, totalMetadata) +} catch { + case NonFatal(e) => isSuccess = false +} finally { + if (!isSuccess) { +// Rollback to the previous status if compaction is failed. --- End diff -- This rollback code will not execute if the process exits during a compaction operation. You will need cleanup code in the class constructor to handle that case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r65945436 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -129,3 +131,86 @@ class FileStreamSource( override def toString: String = s"FileStreamSource[$qualifiedBasePath]" } + +class FileStreamSourceLog(sparkSession: SparkSession, path: String) + extends HDFSMetadataLog[Seq[String]](sparkSession, path) { + + // Configurations about metadata compaction + private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, +s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") + + private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + + private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + + private var compactBatchId: Long = -1L + + private def isCompactionBatch(batchId: Long, compactInterval: Long): Boolean = { +batchId % compactInterval == 0 + } + + override def add(batchId: Long, metadata: Seq[String]): Boolean = { +if (isCompactionBatch(batchId, compactInterval)) { + compactMetadataLog(batchId - 1) +} + +super.add(batchId, metadata) + } + + private def compactMetadataLog(batchId: Long): Unit = { +// read out compact metadata and merge with new metadata. +val batches = super.get(Some(compactBatchId), Some(batchId)) +val totalMetadata = batches.flatMap(_._2) +if (totalMetadata.isEmpty) { + return +} + +// Remove old compact metadata file and rewrite. +val renamedPath = new Path(path, s".${batchId.toString}-${UUID.randomUUID.toString}.tmp") +fileManager.rename(batchIdToPath(batchId), renamedPath) + +var isSuccess = false +try { + isSuccess = super.add(batchId, totalMetadata) +} catch { + case NonFatal(e) => isSuccess = false --- End diff -- Yeah, it is not necessary, I will remove it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r65825524 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -529,7 +529,28 @@ object SQLConf { .internal() .doc("How long in milliseconds a file is guaranteed to be visible for all readers.") .timeConf(TimeUnit.MILLISECONDS) - .createWithDefault(60 * 1000L) // 10 minutes + .createWithDefault(60 * 10 * 1000L) // 10 minutes + + val FILE_SOURCE_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSource.log.deletion") +.internal() +.doc("Whether to delete the expired log files in file stream source.") +.booleanConf +.createWithDefault(true) + + val FILE_SOURCE_LOG_COMPACT_INTERVAL = +SQLConfigBuilder("spark.sql.streaming.fileSource.log.compactInterval") + .internal() + .doc("Number of log files after which all the previous files " + +"are compacted into the next log file.") + .intConf + .createWithDefault(10) + + val FILE_SOURCE_LOG_CLEANUP_DELAY = +SQLConfigBuilder("spark.sql.streaming.fileSource.log.cleanupDelay") + .internal() + .doc("How long in milliseconds a file is guaranteed to be visible for all readers.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(60 * 10 * 1000L) // 10 minutes --- End diff -- A nitpick but think it'd be easier to "decode" - `10 * 60 * 1000L`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r65825474 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -129,3 +131,86 @@ class FileStreamSource( override def toString: String = s"FileStreamSource[$qualifiedBasePath]" } + +class FileStreamSourceLog(sparkSession: SparkSession, path: String) + extends HDFSMetadataLog[Seq[String]](sparkSession, path) { + + // Configurations about metadata compaction + private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, +s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") + + private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + + private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + + private var compactBatchId: Long = -1L + + private def isCompactionBatch(batchId: Long, compactInterval: Long): Boolean = { +batchId % compactInterval == 0 + } + + override def add(batchId: Long, metadata: Seq[String]): Boolean = { +if (isCompactionBatch(batchId, compactInterval)) { + compactMetadataLog(batchId - 1) +} + +super.add(batchId, metadata) + } + + private def compactMetadataLog(batchId: Long): Unit = { +// read out compact metadata and merge with new metadata. +val batches = super.get(Some(compactBatchId), Some(batchId)) +val totalMetadata = batches.flatMap(_._2) +if (totalMetadata.isEmpty) { + return +} + +// Remove old compact metadata file and rewrite. +val renamedPath = new Path(path, s".${batchId.toString}-${UUID.randomUUID.toString}.tmp") +fileManager.rename(batchIdToPath(batchId), renamedPath) + +var isSuccess = false +try { + isSuccess = super.add(batchId, totalMetadata) +} catch { + case NonFatal(e) => isSuccess = false --- End diff -- Why are you setting `isSuccess` to `false` since it's `false` already? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r65825480 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -129,3 +131,86 @@ class FileStreamSource( override def toString: String = s"FileStreamSource[$qualifiedBasePath]" } + +class FileStreamSourceLog(sparkSession: SparkSession, path: String) + extends HDFSMetadataLog[Seq[String]](sparkSession, path) { + + // Configurations about metadata compaction + private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, +s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") + + private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + + private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + + private var compactBatchId: Long = -1L + + private def isCompactionBatch(batchId: Long, compactInterval: Long): Boolean = { +batchId % compactInterval == 0 + } + + override def add(batchId: Long, metadata: Seq[String]): Boolean = { +if (isCompactionBatch(batchId, compactInterval)) { + compactMetadataLog(batchId - 1) +} + +super.add(batchId, metadata) + } + + private def compactMetadataLog(batchId: Long): Unit = { +// read out compact metadata and merge with new metadata. +val batches = super.get(Some(compactBatchId), Some(batchId)) +val totalMetadata = batches.flatMap(_._2) +if (totalMetadata.isEmpty) { + return +} + +// Remove old compact metadata file and rewrite. +val renamedPath = new Path(path, s".${batchId.toString}-${UUID.randomUUID.toString}.tmp") +fileManager.rename(batchIdToPath(batchId), renamedPath) + +var isSuccess = false +try { + isSuccess = super.add(batchId, totalMetadata) +} catch { + case NonFatal(e) => isSuccess = false +} finally { + if (!isSuccess) { +// Rollback to the previous status if compaction is failed. --- End diff -- s/status/state ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r65825440 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -129,3 +131,86 @@ class FileStreamSource( override def toString: String = s"FileStreamSource[$qualifiedBasePath]" } + +class FileStreamSourceLog(sparkSession: SparkSession, path: String) + extends HDFSMetadataLog[Seq[String]](sparkSession, path) { + + // Configurations about metadata compaction + private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, +s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + --- End diff -- I'd move `(was $compactInterval)` at the end of the message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
GitHub user jerryshao opened a pull request: https://github.com/apache/spark/pull/13513 [SPARK-15698][SQL][Streaming] Add the ability to remove the old MetadataLog in FileStreamSource ## What changes were proposed in this pull request? Current `metadataLog` in `FileStreamSource` will add a checkpoint file in each batch but do not have the ability to remove/compact, which will lead to large number of small files when running for a long time. So here propose to compact the old logs into one file. This method is quite similar to `FileStreamSinkLog` but simpler. ## How was this patch tested? Unit test added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerryshao/apache-spark SPARK-15698 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13513.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13513 commit 2ed1115966fdf8b6a8fba990599b230a04e00649 Author: jerryshaoDate: 2016-06-02T03:14:48Z Add the ability to remove the old MetadataLog in FileStreamSource --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org