[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-09-20 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r79747197
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
 ---
@@ -79,213 +76,46 @@ object SinkFileStatus {
  * When the reader uses `allFiles` to list all files, this method only 
returns the visible files
  * (drops the deleted files).
  */
-class FileStreamSinkLog(sparkSession: SparkSession, path: String)
-  extends HDFSMetadataLog[Array[SinkFileStatus]](sparkSession, path) {
-
-  import FileStreamSinkLog._
+class FileStreamSinkLog(
+metadataLogVersion: String,
+sparkSession: SparkSession,
+path: String)
+  extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, 
sparkSession, path) {
 
   private implicit val formats = Serialization.formats(NoTypeHints)
 
-  /**
-   * If we delete the old files after compaction at once, there is a race 
condition in S3: other
-   * processes may see the old files are deleted but still cannot see the 
compaction file using
-   * "list". The `allFiles` handles this by looking for the next 
compaction file directly, however,
-   * a live lock may happen if the compaction happens too frequently: one 
processing keeps deleting
-   * old files while another one keeps retrying. Setting a reasonable 
cleanup delay could avoid it.
-   */
-  private val fileCleanupDelayMs = 
sparkSession.sessionState.conf.fileSinkLogCleanupDelay
+  protected override val fileCleanupDelayMs =
--- End diff --

Oh, sorry about it, will fix it now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-09-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r79668559
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
 ---
@@ -79,213 +76,46 @@ object SinkFileStatus {
  * When the reader uses `allFiles` to list all files, this method only 
returns the visible files
  * (drops the deleted files).
  */
-class FileStreamSinkLog(sparkSession: SparkSession, path: String)
-  extends HDFSMetadataLog[Array[SinkFileStatus]](sparkSession, path) {
-
-  import FileStreamSinkLog._
+class FileStreamSinkLog(
+metadataLogVersion: String,
+sparkSession: SparkSession,
+path: String)
+  extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, 
sparkSession, path) {
 
   private implicit val formats = Serialization.formats(NoTypeHints)
 
-  /**
-   * If we delete the old files after compaction at once, there is a race 
condition in S3: other
-   * processes may see the old files are deleted but still cannot see the 
compaction file using
-   * "list". The `allFiles` handles this by looking for the next 
compaction file directly, however,
-   * a live lock may happen if the compaction happens too frequently: one 
processing keeps deleting
-   * old files while another one keeps retrying. Setting a reasonable 
cleanup delay could avoid it.
-   */
-  private val fileCleanupDelayMs = 
sparkSession.sessionState.conf.fileSinkLogCleanupDelay
+  protected override val fileCleanupDelayMs =
--- End diff --

I just noticed some conflicts here. Could you submit a follow up PR to use 
the previous `sparkSession.sessionState.conf.fileSinkLogCleanupDelay`? Same as 
the other confs. This only exists in master branch, so we don't need to fix 
branch 2.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-09-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/13513


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-09-19 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r79530152
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.{LinkedHashMap => JLinkedHashMap}
+import java.util.Map.Entry
+
+import scala.collection.mutable
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
+import org.apache.spark.sql.internal.SQLConf
+
+class FileStreamSourceLog(
+metadataLogVersion: String,
+sparkSession: SparkSession,
+path: String)
+  extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, 
sparkSession, path) {
+
+  import CompactibleFileStreamLog._
+
+  // Configurations about metadata compaction
+  protected override val compactInterval =
+  sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+  require(compactInterval > 0,
+s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was 
$compactInterval) to a " +
+  s"positive value.")
+
+  protected override val fileCleanupDelayMs =
+sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+
+  protected override val isDeletingExpiredLog =
+sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  // A fixed size log entry cache to cache the file entries belong to the 
compaction batch. It is
+  // used to avoid scanning the compacted log file to retrieve it's own 
batch data.
+  private val cacheSize = compactInterval
+  private val fileEntryCache = new JLinkedHashMap[Long, Array[FileEntry]] {
+override def removeEldestEntry(eldest: Entry[Long, Array[FileEntry]]): 
Boolean = {
+  size() > cacheSize
+}
+  }
+
+  protected override def serializeData(data: FileEntry): String = {
+Serialization.write(data)
+  }
+
+  protected override def deserializeData(encodedString: String): FileEntry 
= {
+Serialization.read[FileEntry](encodedString)
+  }
+
+  def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = {
+logs
+  }
+
+  override def add(batchId: Long, logs: Array[FileEntry]): Boolean = {
+if (super.add(batchId, logs) && isCompactionBatch(batchId, 
compactInterval)) {
--- End diff --

yes, you're right, I will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-09-19 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r79518675
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
+import org.apache.spark.sql.internal.SQLConf
+
+class FileStreamSourceLog(
+metadataLogVersion: String,
+sparkSession: SparkSession,
+path: String)
+  extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, 
sparkSession, path) {
+
+  import CompactibleFileStreamLog._
+
+  // Configurations about metadata compaction
+  protected override val compactInterval =
+  sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+  require(compactInterval > 0,
+s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was 
$compactInterval) to a " +
+  s"positive value.")
+
+  protected override val fileCleanupDelayMs =
+sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+
+  protected override val isDeletingExpiredLog =
+sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  // A fixed size log cache to cache the file entries belong to the 
compaction batch. It is used
+  // to avoid scanning the compacted log file to retrieve it's own batch 
data.
+  private val cacheSize = compactInterval
+  private val fileEntryCache = new mutable.LinkedHashMap[Long, 
Array[FileEntry]]
+
+  private def updateCache(batchId: Long, logs: Array[FileEntry]): Unit = {
+if (fileEntryCache.size >= cacheSize) {
+  fileEntryCache.drop(1)
+}
+
+fileEntryCache.put(batchId, logs)
+  }
+
+  protected override def serializeData(data: FileEntry): String = {
+Serialization.write(data)
+  }
+
+  protected override def deserializeData(encodedString: String): FileEntry 
= {
+Serialization.read[FileEntry](encodedString)
+  }
+
+  def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = {
+logs
+  }
+
+  override def add(batchId: Long, logs: Array[FileEntry]): Boolean = {
+if (super.add(batchId, logs) && isCompactionBatch(batchId, 
compactInterval)) {
+  updateCache(batchId, logs)
+  true
+} else if (!isCompactionBatch(batchId, compactInterval)) {
+  true
+} else {
+  false
+}
+  }
+
+  override def get(startId: Option[Long], endId: Option[Long]): 
Array[(Long, Array[FileEntry])] = {
+val startBatchId = startId.getOrElse(0L)
+val endBatchId = getLatest().map(_._1).getOrElse(0L)
+
+val (existedBatches, removedBatches) = (startBatchId to 
endBatchId).map { id =>
+  if (isCompactionBatch(id, compactInterval) && 
fileEntryCache.contains(id)) {
+(id, Some(fileEntryCache(id)))
+  } else {
+val logs = super.get(id).map(_.filter(_.batchId == id))
+(id, logs)
+  }
+}.partition(_._2.isDefined)
+
+// The below code may only be happened when original metadata log file 
has been removed, so we
+// have to get the batch from latest compacted log file. This is quite 
time-consuming and may
+// not be happened in the current FileStreamSource code path, since we 
only fetch the
+// latest metadata log file.
+val searchKeys = removedBatches.map(_._1)
+val retrievedBatches = if (searchKeys.nonEmpty) {
+  logWarning(s"Get batches from removed files, this is unexpected in 
the current code path!!!")
+  val latestBatchId = 

[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-09-19 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r79518445
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.{LinkedHashMap => JLinkedHashMap}
+import java.util.Map.Entry
+
+import scala.collection.mutable
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
+import org.apache.spark.sql.internal.SQLConf
+
+class FileStreamSourceLog(
+metadataLogVersion: String,
+sparkSession: SparkSession,
+path: String)
+  extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, 
sparkSession, path) {
+
+  import CompactibleFileStreamLog._
+
+  // Configurations about metadata compaction
+  protected override val compactInterval =
+  sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+  require(compactInterval > 0,
+s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was 
$compactInterval) to a " +
+  s"positive value.")
+
+  protected override val fileCleanupDelayMs =
+sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+
+  protected override val isDeletingExpiredLog =
+sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  // A fixed size log entry cache to cache the file entries belong to the 
compaction batch. It is
+  // used to avoid scanning the compacted log file to retrieve it's own 
batch data.
+  private val cacheSize = compactInterval
+  private val fileEntryCache = new JLinkedHashMap[Long, Array[FileEntry]] {
+override def removeEldestEntry(eldest: Entry[Long, Array[FileEntry]]): 
Boolean = {
+  size() > cacheSize
+}
+  }
+
+  protected override def serializeData(data: FileEntry): String = {
+Serialization.write(data)
+  }
+
+  protected override def deserializeData(encodedString: String): FileEntry 
= {
+Serialization.read[FileEntry](encodedString)
+  }
+
+  def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = {
+logs
+  }
+
+  override def add(batchId: Long, logs: Array[FileEntry]): Boolean = {
+if (super.add(batchId, logs) && isCompactionBatch(batchId, 
compactInterval)) {
--- End diff --

This is wrong. If `super.add(batchId, logs)` is false, then we should 
always return `false`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-09-15 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r79099764
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
+import org.apache.spark.sql.internal.SQLConf
+
+class FileStreamSourceLog(
+metadataLogVersion: String,
+sparkSession: SparkSession,
+path: String)
+  extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, 
sparkSession, path) {
+
+  import CompactibleFileStreamLog._
+
+  // Configurations about metadata compaction
+  protected override val compactInterval =
+  sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+  require(compactInterval > 0,
+s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was 
$compactInterval) to a " +
+  s"positive value.")
+
+  protected override val fileCleanupDelayMs =
+sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+
+  protected override val isDeletingExpiredLog =
+sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  // A fixed size log cache to cache the file entries belong to the 
compaction batch. It is used
+  // to avoid scanning the compacted log file to retrieve it's own batch 
data.
+  private val cacheSize = compactInterval
+  private val fileEntryCache = new mutable.LinkedHashMap[Long, 
Array[FileEntry]]
+
+  private def updateCache(batchId: Long, logs: Array[FileEntry]): Unit = {
+if (fileEntryCache.size >= cacheSize) {
+  fileEntryCache.drop(1)
+}
+
+fileEntryCache.put(batchId, logs)
+  }
+
+  protected override def serializeData(data: FileEntry): String = {
+Serialization.write(data)
+  }
+
+  protected override def deserializeData(encodedString: String): FileEntry 
= {
+Serialization.read[FileEntry](encodedString)
+  }
+
+  def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = {
+logs
+  }
+
+  override def add(batchId: Long, logs: Array[FileEntry]): Boolean = {
+if (super.add(batchId, logs) && isCompactionBatch(batchId, 
compactInterval)) {
+  updateCache(batchId, logs)
+  true
+} else if (!isCompactionBatch(batchId, compactInterval)) {
+  true
+} else {
+  false
+}
+  }
+
+  override def get(startId: Option[Long], endId: Option[Long]): 
Array[(Long, Array[FileEntry])] = {
+val startBatchId = startId.getOrElse(0L)
+val endBatchId = getLatest().map(_._1).getOrElse(0L)
+
+val (existedBatches, removedBatches) = (startBatchId to 
endBatchId).map { id =>
+  if (isCompactionBatch(id, compactInterval) && 
fileEntryCache.contains(id)) {
+(id, Some(fileEntryCache(id)))
+  } else {
+val logs = super.get(id).map(_.filter(_.batchId == id))
+(id, logs)
+  }
+}.partition(_._2.isDefined)
+
+// The below code may only be happened when original metadata log file 
has been removed, so we
+// have to get the batch from latest compacted log file. This is quite 
time-consuming and may
+// not be happened in the current FileStreamSource code path, since we 
only fetch the
+// latest metadata log file.
+val searchKeys = removedBatches.map(_._1)
+val retrievedBatches = if (searchKeys.nonEmpty) {
+  logWarning(s"Get batches from removed files, this is unexpected in 
the current code path!!!")
+  val latestBatchId = 

[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-09-15 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r79093102
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
+import org.apache.spark.sql.internal.SQLConf
+
+class FileStreamSourceLog(
+metadataLogVersion: String,
+sparkSession: SparkSession,
+path: String)
+  extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, 
sparkSession, path) {
+
+  import CompactibleFileStreamLog._
+
+  // Configurations about metadata compaction
+  protected override val compactInterval =
+  sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+  require(compactInterval > 0,
+s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was 
$compactInterval) to a " +
+  s"positive value.")
+
+  protected override val fileCleanupDelayMs =
+sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+
+  protected override val isDeletingExpiredLog =
+sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  // A fixed size log cache to cache the file entries belong to the 
compaction batch. It is used
+  // to avoid scanning the compacted log file to retrieve it's own batch 
data.
+  private val cacheSize = compactInterval
+  private val fileEntryCache = new mutable.LinkedHashMap[Long, 
Array[FileEntry]]
+
+  private def updateCache(batchId: Long, logs: Array[FileEntry]): Unit = {
+if (fileEntryCache.size >= cacheSize) {
+  fileEntryCache.drop(1)
--- End diff --

I see, sorry for this issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-09-15 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r79023964
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
+import org.apache.spark.sql.internal.SQLConf
+
+class FileStreamSourceLog(
+metadataLogVersion: String,
+sparkSession: SparkSession,
+path: String)
+  extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, 
sparkSession, path) {
+
+  import CompactibleFileStreamLog._
+
+  // Configurations about metadata compaction
+  protected override val compactInterval =
+  sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+  require(compactInterval > 0,
+s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was 
$compactInterval) to a " +
+  s"positive value.")
+
+  protected override val fileCleanupDelayMs =
+sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+
+  protected override val isDeletingExpiredLog =
+sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  // A fixed size log cache to cache the file entries belong to the 
compaction batch. It is used
+  // to avoid scanning the compacted log file to retrieve it's own batch 
data.
+  private val cacheSize = compactInterval
+  private val fileEntryCache = new mutable.LinkedHashMap[Long, 
Array[FileEntry]]
+
+  private def updateCache(batchId: Long, logs: Array[FileEntry]): Unit = {
+if (fileEntryCache.size >= cacheSize) {
+  fileEntryCache.drop(1)
--- End diff --

`drop` doesn't change the original map.

```Scala
scala> val m = scala.collection.mutable.LinkedHashMap[Int, Int]()
m: scala.collection.mutable.LinkedHashMap[Int,Int] = Map()

scala> 

scala> m(2) = 1

scala> m
res1: scala.collection.mutable.LinkedHashMap[Int,Int] = Map(2 -> 1)

scala> m.drop(1)
res2: scala.collection.mutable.LinkedHashMap[Int,Int] = Map()

scala> m
res3: scala.collection.mutable.LinkedHashMap[Int,Int] = Map(2 -> 1)
```

I think it should be Java LinkedHashMap. This is an example: 
https://github.com/apache/spark/blob/03d46aafe561b03e25f4e25cf01e631c18dd827c/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#L45


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-09-15 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r79027018
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
+import org.apache.spark.sql.internal.SQLConf
+
+class FileStreamSourceLog(
+metadataLogVersion: String,
+sparkSession: SparkSession,
+path: String)
+  extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, 
sparkSession, path) {
+
+  import CompactibleFileStreamLog._
+
+  // Configurations about metadata compaction
+  protected override val compactInterval =
+  sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+  require(compactInterval > 0,
+s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was 
$compactInterval) to a " +
+  s"positive value.")
+
+  protected override val fileCleanupDelayMs =
+sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+
+  protected override val isDeletingExpiredLog =
+sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  // A fixed size log cache to cache the file entries belong to the 
compaction batch. It is used
+  // to avoid scanning the compacted log file to retrieve it's own batch 
data.
+  private val cacheSize = compactInterval
+  private val fileEntryCache = new mutable.LinkedHashMap[Long, 
Array[FileEntry]]
+
+  private def updateCache(batchId: Long, logs: Array[FileEntry]): Unit = {
+if (fileEntryCache.size >= cacheSize) {
+  fileEntryCache.drop(1)
+}
+
+fileEntryCache.put(batchId, logs)
+  }
+
+  protected override def serializeData(data: FileEntry): String = {
+Serialization.write(data)
+  }
+
+  protected override def deserializeData(encodedString: String): FileEntry 
= {
+Serialization.read[FileEntry](encodedString)
+  }
+
+  def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = {
+logs
+  }
+
+  override def add(batchId: Long, logs: Array[FileEntry]): Boolean = {
+if (super.add(batchId, logs) && isCompactionBatch(batchId, 
compactInterval)) {
+  updateCache(batchId, logs)
+  true
+} else if (!isCompactionBatch(batchId, compactInterval)) {
+  true
+} else {
+  false
+}
+  }
+
+  override def get(startId: Option[Long], endId: Option[Long]): 
Array[(Long, Array[FileEntry])] = {
+val startBatchId = startId.getOrElse(0L)
+val endBatchId = getLatest().map(_._1).getOrElse(0L)
+
+val (existedBatches, removedBatches) = (startBatchId to 
endBatchId).map { id =>
+  if (isCompactionBatch(id, compactInterval) && 
fileEntryCache.contains(id)) {
+(id, Some(fileEntryCache(id)))
+  } else {
+val logs = super.get(id).map(_.filter(_.batchId == id))
+(id, logs)
+  }
+}.partition(_._2.isDefined)
+
+// The below code may only be happened when original metadata log file 
has been removed, so we
+// have to get the batch from latest compacted log file. This is quite 
time-consuming and may
+// not be happened in the current FileStreamSource code path, since we 
only fetch the
+// latest metadata log file.
+val searchKeys = removedBatches.map(_._1)
+val retrievedBatches = if (searchKeys.nonEmpty) {
+  logWarning(s"Get batches from removed files, this is unexpected in 
the current code path!!!")
+  val latestBatchId = 

[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-09-13 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r78630727
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
 ---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.IOException
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.TimeUnit
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.fs.{Path, PathFilter}
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * An abstract class for compactible metadata logs. It will write one log 
file for each batch.
+ * The first line of the log file is the version number, and there are 
multiple JSON lines
+ * following.
+ *
+ * As reading from many small files is usually pretty slow, also too many
+ * small files in one folder will mess the FS, 
[[CompactibleFileStreamLog]] will
+ * compact log files every 10 batches by default into a big file. When
+ * doing a compaction, it will read all old log files and merge them with 
the new batch.
+ */
+abstract class CompactibleFileStreamLog[T: ClassTag](
+sparkSession: SparkSession,
+path: String)
+  extends HDFSMetadataLog[Array[T]](sparkSession, path) {
+
+  import CompactibleFileStreamLog._
+
+  /**
+   * If we delete the old files after compaction at once, there is a race 
condition in S3: other
+   * processes may see the old files are deleted but still cannot see the 
compaction file using
+   * "list". The `allFiles` handles this by looking for the next 
compaction file directly, however,
+   * a live lock may happen if the compaction happens too frequently: one 
processing keeps deleting
+   * old files while another one keeps retrying. Setting a reasonable 
cleanup delay could avoid it.
+   */
+  protected val fileCleanupDelayMs = TimeUnit.MINUTES.toMillis(10)
+
+  protected val isDeletingExpiredLog = true
+
+  protected val compactInterval = 10
+
+  /**
+   * Serialize the data into encoded string.
+   */
+  protected def serializeData(t: T): String
+
+  /**
+   * Deserialize the string into data object.
+   */
+  protected def deserializeData(encodedString: String): T
+
+  /**
+   * Filter out the unwanted logs, by default it filters out nothing, 
inherited class could
+   * override this method to do filtering.
+   */
+  protected def compactLogs(oldLogs: Seq[T], newLogs: Seq[T]): Seq[T] = {
+oldLogs ++ newLogs
+  }
+
+  override def batchIdToPath(batchId: Long): Path = {
+if (isCompactionBatch(batchId, compactInterval)) {
+  new Path(metadataPath, s"$batchId$COMPACT_FILE_SUFFIX")
+} else {
+  new Path(metadataPath, batchId.toString)
+}
+  }
+
+  override def pathToBatchId(path: Path): Long = {
+getBatchIdFromFileName(path.getName)
+  }
+
+  override def isBatchFile(path: Path): Boolean = {
+try {
+  getBatchIdFromFileName(path.getName)
+  true
+} catch {
+  case _: NumberFormatException => false
+}
+  }
+
+  override def serialize(logData: Array[T]): Array[Byte] = {
+(VERSION +: logData.map(serializeData)).mkString("\n").getBytes(UTF_8)
--- End diff --

Could you make VERSION be a constructor parameter in order to support to 
change source or sink format separately?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional 

[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-08-26 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r76499068
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -129,3 +131,86 @@ class FileStreamSource(
 
   override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
 }
+
+class FileStreamSourceLog(sparkSession: SparkSession, path: String)
+  extends HDFSMetadataLog[Seq[String]](sparkSession, path) {
+
+  // Configurations about metadata compaction
+  private val compactInterval = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+  require(compactInterval > 0,
+s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was 
$compactInterval) to a " +
+  s"positive value.")
+
+  private val fileCleanupDelayMs = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+
+  private val isDeletingExpiredLog = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+
+  private var compactBatchId: Long = -1L
+
+  private def isCompactionBatch(batchId: Long, compactInterval: Long): 
Boolean = {
+batchId % compactInterval == 0
+  }
+
+  override def add(batchId: Long, metadata: Seq[String]): Boolean = {
+if (isCompactionBatch(batchId, compactInterval)) {
+  compactMetadataLog(batchId - 1)
+}
+
+super.add(batchId, metadata)
+  }
+
+  private def compactMetadataLog(batchId: Long): Unit = {
+// read out compact metadata and merge with new metadata.
+val batches = super.get(Some(compactBatchId), Some(batchId))
+val totalMetadata = batches.flatMap(_._2)
+if (totalMetadata.isEmpty) {
+  return
+}
+
+// Remove old compact metadata file and rewrite.
+val renamedPath = new Path(path, 
s".${batchId.toString}-${UUID.randomUUID.toString}.tmp")
+fileManager.rename(batchIdToPath(batchId), renamedPath)
+
+var isSuccess = false
+try {
+  isSuccess = super.add(batchId, totalMetadata)
+} catch {
+  case NonFatal(e) => isSuccess = false
+} finally {
+  if (!isSuccess) {
+// Rollback to the previous status if compaction is failed.
--- End diff --

This rollback code will not execute if the process exits during a 
compaction operation. You will need cleanup code in the class constructor to 
handle that case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-06-06 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r65945436
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -129,3 +131,86 @@ class FileStreamSource(
 
   override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
 }
+
+class FileStreamSourceLog(sparkSession: SparkSession, path: String)
+  extends HDFSMetadataLog[Seq[String]](sparkSession, path) {
+
+  // Configurations about metadata compaction
+  private val compactInterval = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+  require(compactInterval > 0,
+s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was 
$compactInterval) to a " +
+  s"positive value.")
+
+  private val fileCleanupDelayMs = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+
+  private val isDeletingExpiredLog = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+
+  private var compactBatchId: Long = -1L
+
+  private def isCompactionBatch(batchId: Long, compactInterval: Long): 
Boolean = {
+batchId % compactInterval == 0
+  }
+
+  override def add(batchId: Long, metadata: Seq[String]): Boolean = {
+if (isCompactionBatch(batchId, compactInterval)) {
+  compactMetadataLog(batchId - 1)
+}
+
+super.add(batchId, metadata)
+  }
+
+  private def compactMetadataLog(batchId: Long): Unit = {
+// read out compact metadata and merge with new metadata.
+val batches = super.get(Some(compactBatchId), Some(batchId))
+val totalMetadata = batches.flatMap(_._2)
+if (totalMetadata.isEmpty) {
+  return
+}
+
+// Remove old compact metadata file and rewrite.
+val renamedPath = new Path(path, 
s".${batchId.toString}-${UUID.randomUUID.toString}.tmp")
+fileManager.rename(batchIdToPath(batchId), renamedPath)
+
+var isSuccess = false
+try {
+  isSuccess = super.add(batchId, totalMetadata)
+} catch {
+  case NonFatal(e) => isSuccess = false
--- End diff --

Yeah, it is not necessary, I will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-06-05 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r65825524
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -529,7 +529,28 @@ object SQLConf {
   .internal()
   .doc("How long in milliseconds a file is guaranteed to be visible 
for all readers.")
   .timeConf(TimeUnit.MILLISECONDS)
-  .createWithDefault(60 * 1000L) // 10 minutes
+  .createWithDefault(60 * 10 * 1000L) // 10 minutes
+
+  val FILE_SOURCE_LOG_DELETION = 
SQLConfigBuilder("spark.sql.streaming.fileSource.log.deletion")
+.internal()
+.doc("Whether to delete the expired log files in file stream source.")
+.booleanConf
+.createWithDefault(true)
+
+  val FILE_SOURCE_LOG_COMPACT_INTERVAL =
+SQLConfigBuilder("spark.sql.streaming.fileSource.log.compactInterval")
+  .internal()
+  .doc("Number of log files after which all the previous files " +
+"are compacted into the next log file.")
+  .intConf
+  .createWithDefault(10)
+
+  val FILE_SOURCE_LOG_CLEANUP_DELAY =
+SQLConfigBuilder("spark.sql.streaming.fileSource.log.cleanupDelay")
+  .internal()
+  .doc("How long in milliseconds a file is guaranteed to be visible 
for all readers.")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefault(60 * 10 * 1000L) // 10 minutes
--- End diff --

A nitpick but think it'd be easier to "decode" - `10 * 60 * 1000L`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-06-05 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r65825474
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -129,3 +131,86 @@ class FileStreamSource(
 
   override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
 }
+
+class FileStreamSourceLog(sparkSession: SparkSession, path: String)
+  extends HDFSMetadataLog[Seq[String]](sparkSession, path) {
+
+  // Configurations about metadata compaction
+  private val compactInterval = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+  require(compactInterval > 0,
+s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was 
$compactInterval) to a " +
+  s"positive value.")
+
+  private val fileCleanupDelayMs = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+
+  private val isDeletingExpiredLog = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+
+  private var compactBatchId: Long = -1L
+
+  private def isCompactionBatch(batchId: Long, compactInterval: Long): 
Boolean = {
+batchId % compactInterval == 0
+  }
+
+  override def add(batchId: Long, metadata: Seq[String]): Boolean = {
+if (isCompactionBatch(batchId, compactInterval)) {
+  compactMetadataLog(batchId - 1)
+}
+
+super.add(batchId, metadata)
+  }
+
+  private def compactMetadataLog(batchId: Long): Unit = {
+// read out compact metadata and merge with new metadata.
+val batches = super.get(Some(compactBatchId), Some(batchId))
+val totalMetadata = batches.flatMap(_._2)
+if (totalMetadata.isEmpty) {
+  return
+}
+
+// Remove old compact metadata file and rewrite.
+val renamedPath = new Path(path, 
s".${batchId.toString}-${UUID.randomUUID.toString}.tmp")
+fileManager.rename(batchIdToPath(batchId), renamedPath)
+
+var isSuccess = false
+try {
+  isSuccess = super.add(batchId, totalMetadata)
+} catch {
+  case NonFatal(e) => isSuccess = false
--- End diff --

Why are you setting `isSuccess` to `false` since it's `false` already?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-06-05 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r65825480
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -129,3 +131,86 @@ class FileStreamSource(
 
   override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
 }
+
+class FileStreamSourceLog(sparkSession: SparkSession, path: String)
+  extends HDFSMetadataLog[Seq[String]](sparkSession, path) {
+
+  // Configurations about metadata compaction
+  private val compactInterval = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+  require(compactInterval > 0,
+s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was 
$compactInterval) to a " +
+  s"positive value.")
+
+  private val fileCleanupDelayMs = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+
+  private val isDeletingExpiredLog = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+
+  private var compactBatchId: Long = -1L
+
+  private def isCompactionBatch(batchId: Long, compactInterval: Long): 
Boolean = {
+batchId % compactInterval == 0
+  }
+
+  override def add(batchId: Long, metadata: Seq[String]): Boolean = {
+if (isCompactionBatch(batchId, compactInterval)) {
+  compactMetadataLog(batchId - 1)
+}
+
+super.add(batchId, metadata)
+  }
+
+  private def compactMetadataLog(batchId: Long): Unit = {
+// read out compact metadata and merge with new metadata.
+val batches = super.get(Some(compactBatchId), Some(batchId))
+val totalMetadata = batches.flatMap(_._2)
+if (totalMetadata.isEmpty) {
+  return
+}
+
+// Remove old compact metadata file and rewrite.
+val renamedPath = new Path(path, 
s".${batchId.toString}-${UUID.randomUUID.toString}.tmp")
+fileManager.rename(batchIdToPath(batchId), renamedPath)
+
+var isSuccess = false
+try {
+  isSuccess = super.add(batchId, totalMetadata)
+} catch {
+  case NonFatal(e) => isSuccess = false
+} finally {
+  if (!isSuccess) {
+// Rollback to the previous status if compaction is failed.
--- End diff --

s/status/state ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-06-05 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r65825440
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -129,3 +131,86 @@ class FileStreamSource(
 
   override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
 }
+
+class FileStreamSourceLog(sparkSession: SparkSession, path: String)
+  extends HDFSMetadataLog[Seq[String]](sparkSession, path) {
+
+  // Configurations about metadata compaction
+  private val compactInterval = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+  require(compactInterval > 0,
+s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was 
$compactInterval) to a " +
--- End diff --

I'd move `(was $compactInterval)` at the end of the message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-06-04 Thread jerryshao
GitHub user jerryshao opened a pull request:

https://github.com/apache/spark/pull/13513

[SPARK-15698][SQL][Streaming] Add the ability to remove the old MetadataLog 
in FileStreamSource

## What changes were proposed in this pull request?

Current `metadataLog` in `FileStreamSource` will add a checkpoint file in 
each batch but do not have the ability to remove/compact, which will lead to 
large number of small files when running for a long time. So here propose to 
compact the old logs into one file. This method is quite similar to 
`FileStreamSinkLog` but simpler.

## How was this patch tested?

Unit test added.




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jerryshao/apache-spark SPARK-15698

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/13513.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #13513


commit 2ed1115966fdf8b6a8fba990599b230a04e00649
Author: jerryshao 
Date:   2016-06-02T03:14:48Z

Add the ability to remove the old MetadataLog in FileStreamSource




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org