[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-24 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60425750
  
Let's merge this for now. I will try and find out more about the 
getFileSystem thread-safety without doAs (which is what we support anyway)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-24 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60429942
  
Talked to @cmccabe who says we should not worry about the thread-safety. If 
at all there was an issue, it was in too old a version which we need not worry 
about. Let's merge this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-24 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60431494
  
Cool! Thanks for check with @cmccabe. Merging this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/2882


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60201254
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22067/consoleFull)
 for   PR 2882 at commit 
[`3881706`](https://github.com/apache/spark/commit/38817069e66cc8c161cc2a8033873a3342cff4e2).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60208450
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22067/consoleFull)
 for   PR 2882 at commit 
[`3881706`](https://github.com/apache/spark/commit/38817069e66cc8c161cc2a8033873a3342cff4e2).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class LogInfo(startTime: Long, endTime: Long, path: String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60208456
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22067/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60213859
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22068/consoleFull)
 for   PR 2882 at commit 
[`9514dc8`](https://github.com/apache/spark/commit/9514dc833c9c30be12eeb64fb4580c2e6f1adb4f).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60213891
  
@JoshRosen
@harishreedharan  addressed all your comments, and also simplified the 
writer code
I did some further cleanups, and also added two new unit tests that test 
the writer and manager with corrupted writes. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60219275
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22068/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60219269
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22068/consoleFull)
 for   PR 2882 at commit 
[`9514dc8`](https://github.com/apache/spark/commit/9514dc833c9c30be12eeb64fb4580c2e6f1adb4f).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class LogInfo(startTime: Long, endTime: Long, path: String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60274484
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22075/consoleFull)
 for   PR 2882 at commit 
[`d29fddd`](https://github.com/apache/spark/commit/d29fddd880fd7efec8ed05017a12600bcb2aa829).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60286445
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22075/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60286438
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22075/consoleFull)
 for   PR 2882 at commit 
[`d29fddd`](https://github.com/apache/spark/commit/d29fddd880fd7efec8ed05017a12600bcb2aa829).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class LogInfo(startTime: Long, endTime: Long, path: String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60286594
  
Yay, finally! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60322270
  
@JoshRosen whenever you get a chance. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19318044
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, 
FSDataOutputStream, Path}
+
+private[streaming] object HdfsUtils {
+
+  def getOutputStream(path: String, conf: Configuration): 
FSDataOutputStream = {
+// HDFS is not thread-safe when getFileSystem is called, so 
synchronize on that
--- End diff --

It looks like this comment is no longer relevant, or perhaps like it should 
be moved somewhere else?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19318068
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, 
FSDataOutputStream, Path}
+
+private[streaming] object HdfsUtils {
+
+  def getOutputStream(path: String, conf: Configuration): 
FSDataOutputStream = {
+// HDFS is not thread-safe when getFileSystem is called, so 
synchronize on that
+val dfsPath = new Path(path)
+val dfs = getFileSystemForPath(dfsPath, conf)
+// If the file exists and we have append support, append instead of 
creating a new file
+val stream: FSDataOutputStream = {
+  if (dfs.isFile(dfsPath)) {
+if (conf.getBoolean(hdfs.append.support, false)) {
+  dfs.append(dfsPath)
+} else {
+  throw new IllegalStateException(File exists and there is no 
append support!)
+}
+  } else {
+dfs.create(dfsPath)
+  }
+}
+stream
+  }
+
+  def getInputStream(path: String, conf: Configuration): FSDataInputStream 
= {
+val dfsPath = new Path(path)
+val dfs = getFileSystemForPath(dfsPath, conf)
+val instream = dfs.open(dfsPath)
+instream
+  }
+
+  def checkState(state: Boolean, errorMsg: = String) {
+if (!state) {
+  throw new IllegalStateException(errorMsg)
+}
+  }
+
+  def getBlockLocations(path: String, conf: Configuration): 
Option[Array[String]] = {
+val dfsPath = new Path(path)
+val dfs = getFileSystemForPath(dfsPath, conf)
+val fileStatus = dfs.getFileStatus(dfsPath)
+val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, 
fileStatus.getLen))
+blockLocs.map(_.flatMap(_.getHosts))
+  }
+
+  def getFileSystemForPath(path: Path, conf: Configuration) = synchronized 
{
+val fs = path.getFileSystem(conf)
--- End diff --

Based on the old comment, does this need synchronization?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19318091
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
--- End diff --

Need an extra space before `@param`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19318155
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
--- End diff --

How could callerName be null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19318248
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
+  private val threadpoolName = sWriteAheadLogManager $callerNameTag
+  implicit private val executionContext = 
ExecutionContext.fromExecutorService(
+Utils.newDaemonFixedThreadPool(1, threadpoolName))
+  override protected val logName = sWriteAheadLogManager $callerNameTag
+
+  private var currentLogPath: String = null
+  private var currentLogWriter: WriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /** Write a byte buffer to the log file */
+  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
+var fileSegment: FileSegment = null
+var failures = 0
+var lastException: Exception = null
+var succeeded = false
+while (!succeeded  failures  maxFailures) {
+  try {
+fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+succeeded = true
+  } catch {
+case ex: Exception =
+  lastException = ex
+  logWarning(Failed to write to write ahead log)
+  resetWriter()
+  failures += 1
+  }
+}
+if (fileSegment == null) {
+  logError(sFailed to write to write ahead log after $failures 
failures)
+  throw lastException
+}
+fileSegment
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * Note that this is typically called when the caller is initializing 
and wants
+   * to recover past  state from the write ahead logs (that is, before 
making any writes).
+   * If this is called after writes have been made using this manager, 
then it may not return
+   * the latest the records. This does not deal with 

[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19318338
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
--- End diff --

If someone explicitly passes null. But checking that can be avoided.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19318524
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, 
FSDataOutputStream, Path}
+
+private[streaming] object HdfsUtils {
+
+  def getOutputStream(path: String, conf: Configuration): 
FSDataOutputStream = {
+// HDFS is not thread-safe when getFileSystem is called, so 
synchronize on that
--- End diff --

Moved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19318631
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, 
FSDataOutputStream, Path}
+
+private[streaming] object HdfsUtils {
+
+  def getOutputStream(path: String, conf: Configuration): 
FSDataOutputStream = {
+// HDFS is not thread-safe when getFileSystem is called, so 
synchronize on that
+val dfsPath = new Path(path)
+val dfs = getFileSystemForPath(dfsPath, conf)
+// If the file exists and we have append support, append instead of 
creating a new file
+val stream: FSDataOutputStream = {
+  if (dfs.isFile(dfsPath)) {
+if (conf.getBoolean(hdfs.append.support, false)) {
+  dfs.append(dfsPath)
+} else {
+  throw new IllegalStateException(File exists and there is no 
append support!)
+}
+  } else {
+dfs.create(dfsPath)
+  }
+}
+stream
+  }
+
+  def getInputStream(path: String, conf: Configuration): FSDataInputStream 
= {
+val dfsPath = new Path(path)
+val dfs = getFileSystemForPath(dfsPath, conf)
+val instream = dfs.open(dfsPath)
+instream
+  }
+
+  def checkState(state: Boolean, errorMsg: = String) {
+if (!state) {
+  throw new IllegalStateException(errorMsg)
+}
+  }
+
+  def getBlockLocations(path: String, conf: Configuration): 
Option[Array[String]] = {
+val dfsPath = new Path(path)
+val dfs = getFileSystemForPath(dfsPath, conf)
+val fileStatus = dfs.getFileStatus(dfsPath)
+val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, 
fileStatus.getLen))
+blockLocs.map(_.flatMap(_.getHosts))
+  }
+
+  def getFileSystemForPath(path: Path, conf: Configuration) = synchronized 
{
+val fs = path.getFileSystem(conf)
--- End diff --

@harishreedharan Can you elaborate on why `getFileSystem` is not 
thread-safe? References?
And if it is indeed not thread-safe, then doing synchronization here does 
not solve the problem because other threads in spark could be access 
getFileSystem at the same time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19318648
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19318649
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
--- End diff --

Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19318676
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, 
FSDataOutputStream, Path}
+
+private[streaming] object HdfsUtils {
+
+  def getOutputStream(path: String, conf: Configuration): 
FSDataOutputStream = {
+// HDFS is not thread-safe when getFileSystem is called, so 
synchronize on that
+val dfsPath = new Path(path)
+val dfs = getFileSystemForPath(dfsPath, conf)
+// If the file exists and we have append support, append instead of 
creating a new file
+val stream: FSDataOutputStream = {
+  if (dfs.isFile(dfsPath)) {
+if (conf.getBoolean(hdfs.append.support, false)) {
+  dfs.append(dfsPath)
+} else {
+  throw new IllegalStateException(File exists and there is no 
append support!)
+}
+  } else {
+dfs.create(dfsPath)
+  }
+}
+stream
+  }
+
+  def getInputStream(path: String, conf: Configuration): FSDataInputStream 
= {
+val dfsPath = new Path(path)
+val dfs = getFileSystemForPath(dfsPath, conf)
+val instream = dfs.open(dfsPath)
+instream
+  }
+
+  def checkState(state: Boolean, errorMsg: = String) {
+if (!state) {
+  throw new IllegalStateException(errorMsg)
+}
+  }
+
+  def getBlockLocations(path: String, conf: Configuration): 
Option[Array[String]] = {
+val dfsPath = new Path(path)
+val dfs = getFileSystemForPath(dfsPath, conf)
+val fileStatus = dfs.getFileStatus(dfsPath)
+val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, 
fileStatus.getLen))
+blockLocs.map(_.flatMap(_.getHosts))
+  }
+
+  def getFileSystemForPath(path: Path, conf: Configuration) = synchronized 
{
+val fs = path.getFileSystem(conf)
--- End diff --

@aarondav, is this that same file system issue that you mentioned to me?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60332121
  
This looks good to me!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60333829
  
Alright, thanks! I will merge when this last set of changes gets through 
jenkins.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60333918
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22103/consoleFull)
 for   PR 2882 at commit 
[`e4bee20`](https://github.com/apache/spark/commit/e4bee2065293d7373c43fe5636dd9971dede257e).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60338152
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22103/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60338151
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22103/consoleFull)
 for   PR 2882 at commit 
[`e4bee20`](https://github.com/apache/spark/commit/e4bee2065293d7373c43fe5636dd9971dede257e).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class LogInfo(startTime: Long, endTime: Long, path: String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19195658
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.implicitConversions
+import scala.language.postfixOps
+import scala.util.Random
+
+import WriteAheadLogSuite._
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+/**
+ * This testsuite tests all classes related to write ahead logs.
+ */
+class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
+
+  val hadoopConf = new Configuration()
+  var tempDirectory: File = null
+
+  before {
+tempDirectory = Files.createTempDir()
+  }
+
+  after {
+if (tempDirectory != null  tempDirectory.exists()) {
+  FileUtils.deleteDirectory(tempDirectory)
+  tempDirectory = null
+}
+  }
+
+  test(WriteAheadLogWriter - writing data) {
+val file = new File(tempDirectory, Random.nextString(10))
+val dataToWrite = generateRandomData()
+val writer = new WriteAheadLogWriter(file:/// + file, hadoopConf)
+val segments = dataToWrite.map(data = writer.write(data))
+writer.close()
+val writtenData = readDataManually(file, segments)
+assert(writtenData.toArray === dataToWrite.toArray)
+  }
+
+  test(WriteAheadLogWriter - syncing of data by writing and reading 
immediately) {
+val file = new File(tempDirectory, Random.nextString(10))
+val dataToWrite = generateRandomData()
+val writer = new WriteAheadLogWriter(file:/// + file, hadoopConf)
+dataToWrite.foreach { data =
+  val segment = writer.write(data)
+  assert(readDataManually(file, Seq(segment)).head === data)
+}
+writer.close()
+  }
+
+  test(WriteAheadLogReader - sequentially reading data) {
+// Write data manually for testing the sequential reader
+val file = File.createTempFile(TestSequentialReads, , 
tempDirectory)
+val writtenData = generateRandomData()
+writeDataManually(writtenData, file)
+val reader = new WriteAheadLogReader(file:/// + file.toString, 
hadoopConf)
+val readData = reader.toSeq.map(byteBufferToString)
+assert(readData.toList === writtenData.toList)
+assert(reader.hasNext === false)
+intercept[Exception] {
+  reader.next()
+}
+reader.close()
+  }
+
+  test(WriteAheadLogReader - sequentially reading data written with 
writer) {
+// Write data manually for testing the sequential reader
+val file = new File(tempDirectory, TestWriter)
+val dataToWrite = generateRandomData()
+writeDataUsingWriter(file, dataToWrite)
+val iter = dataToWrite.iterator
+val reader = new WriteAheadLogReader(file:/// + file.toString, 
hadoopConf)
+reader.foreach { byteBuffer =
+  assert(byteBufferToString(byteBuffer) === iter.next())
+}
+reader.close()
+  }
+
+  test(WriteAheadLogRandomReader - reading data using random reader) {
+// Write data manually for testing the random reader
+val file = File.createTempFile(TestRandomReads, , tempDirectory)
+val writtenData = generateRandomData()
+val segments = writeDataManually(writtenData, file)
+
+// Get a random order of these segments and read them back
+val writtenDataAndSegments = 

[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60040386
  
This is nice work overall.  I like the thorough tests, especially the 
decoupling of the writer / reader tests so that you can test the components 
separately and as part of the complete log manager system.  I left a few 
comments on the code, but I have a couple of high-level questions, too:

When is it safe to rotate / delete old logs?  In general, it seems like 
safe log compaction / deletion is application-specific and that a simple 
time-based mechanism might be unsafe.

What would happen if Spark Streaming crashed, stayed down for some multiple 
of the threshold time, then recovered?  Would we read this old portion of the 
log or would it be deleted / ignored?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19197515
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
+  private val threadpoolName = sWriteAheadLogManager $callerNameTag
+  implicit private val executionContext = 
ExecutionContext.fromExecutorService(
+Utils.newDaemonFixedThreadPool(1, threadpoolName))
+  override protected val logName = sWriteAheadLogManager $callerNameTag
+
+  private var currentLogPath: String = null
+  private var currentLogWriter: WriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /** Write a byte buffer to the log file */
--- End diff --

Yes, I agree. Will made it more clear. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19197661
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
+  private val threadpoolName = sWriteAheadLogManager $callerNameTag
+  implicit private val executionContext = 
ExecutionContext.fromExecutorService(
+Utils.newDaemonFixedThreadPool(1, threadpoolName))
+  override protected val logName = sWriteAheadLogManager $callerNameTag
+
+  private var currentLogPath: String = null
+  private var currentLogWriter: WriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /** Write a byte buffer to the log file */
+  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
+var fileSegment: FileSegment = null
+var failures = 0
+var lastException: Exception = null
+var succeeded = false
--- End diff --

I had thought so, and I usually do it. But for reading I felt its easier to 
understand `if (!succeeded ...` . Am happy to change it.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19197745
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
+  private val threadpoolName = sWriteAheadLogManager $callerNameTag
+  implicit private val executionContext = 
ExecutionContext.fromExecutorService(
+Utils.newDaemonFixedThreadPool(1, threadpoolName))
+  override protected val logName = sWriteAheadLogManager $callerNameTag
+
+  private var currentLogPath: String = null
+  private var currentLogWriter: WriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /** Write a byte buffer to the log file */
+  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
+var fileSegment: FileSegment = null
+var failures = 0
+var lastException: Exception = null
+var succeeded = false
+while (!succeeded  failures  maxFailures) {
+  try {
+fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+succeeded = true
+  } catch {
+case ex: Exception =
+  lastException = ex
+  logWarning(Failed to write to write ahead log)
+  resetWriter()
+  failures += 1
+  }
+}
+if (fileSegment == null) {
+  logError(sFailed to write to write ahead log after $failures 
failures)
+  throw lastException
+}
+fileSegment
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * Note that this is typically called when the caller is initializing 
and wants
+   * to recover past  state from the write ahead logs (that is, before 
making any writes).
+   * If this is called after writes have been made using this manager, 
then it may not return
+   * the latest the records. This does not deal with currently 

[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19197871
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
+  private val threadpoolName = sWriteAheadLogManager $callerNameTag
+  implicit private val executionContext = 
ExecutionContext.fromExecutorService(
+Utils.newDaemonFixedThreadPool(1, threadpoolName))
+  override protected val logName = sWriteAheadLogManager $callerNameTag
+
+  private var currentLogPath: String = null
+  private var currentLogWriter: WriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /** Write a byte buffer to the log file */
+  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
+var fileSegment: FileSegment = null
+var failures = 0
+var lastException: Exception = null
+var succeeded = false
+while (!succeeded  failures  maxFailures) {
+  try {
+fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+succeeded = true
+  } catch {
+case ex: Exception =
+  lastException = ex
+  logWarning(Failed to write to write ahead log)
+  resetWriter()
+  failures += 1
+  }
+}
+if (fileSegment == null) {
+  logError(sFailed to write to write ahead log after $failures 
failures)
+  throw lastException
+}
+fileSegment
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * Note that this is typically called when the caller is initializing 
and wants
+   * to recover past  state from the write ahead logs (that is, before 
making any writes).
+   * If this is called after writes have been made using this manager, 
then it may not return
+   * the latest the records. This does not deal with currently 

[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19197947
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io._
+import java.net.URI
+import java.nio.ByteBuffer
+
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
+
+/**
+ * A writer for writing byte-buffers to a write ahead log file.
+ */
+private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: 
Configuration)
+  extends Closeable {
+  private val underlyingStream: Either[DataOutputStream, 
FSDataOutputStream] = {
+val uri = new URI(path)
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == file
+
+if ((isDefaultLocal  uri.getScheme == null) || uri.getScheme == 
file) {
+  assert(!new File(uri.getPath).exists)
+  Left(new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(uri.getPath
+} else {
+  Right(HdfsUtils.getOutputStream(path, hadoopConf))
+}
+  }
+
+  private lazy val hadoopFlushMethod = {
+val cls = classOf[FSDataOutputStream]
+
Try(cls.getMethod(hflush)).orElse(Try(cls.getMethod(sync))).toOption
--- End diff --

Credit goes to Colin McCabe who wrote this line. 

https://github.com/apache/spark/blame/master/core/src/main/scala/org/apache/spark/util/FileLogger.scala#L106
Stole from there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19198008
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.implicitConversions
+import scala.language.postfixOps
+import scala.util.Random
+
+import WriteAheadLogSuite._
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+/**
+ * This testsuite tests all classes related to write ahead logs.
+ */
+class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
+
+  val hadoopConf = new Configuration()
+  var tempDirectory: File = null
+
+  before {
+tempDirectory = Files.createTempDir()
+  }
+
+  after {
+if (tempDirectory != null  tempDirectory.exists()) {
+  FileUtils.deleteDirectory(tempDirectory)
+  tempDirectory = null
+}
+  }
+
+  test(WriteAheadLogWriter - writing data) {
+val file = new File(tempDirectory, Random.nextString(10))
+val dataToWrite = generateRandomData()
+val writer = new WriteAheadLogWriter(file:/// + file, hadoopConf)
+val segments = dataToWrite.map(data = writer.write(data))
+writer.close()
+val writtenData = readDataManually(file, segments)
+assert(writtenData.toArray === dataToWrite.toArray)
+  }
+
+  test(WriteAheadLogWriter - syncing of data by writing and reading 
immediately) {
+val file = new File(tempDirectory, Random.nextString(10))
+val dataToWrite = generateRandomData()
+val writer = new WriteAheadLogWriter(file:/// + file, hadoopConf)
+dataToWrite.foreach { data =
+  val segment = writer.write(data)
+  assert(readDataManually(file, Seq(segment)).head === data)
+}
+writer.close()
+  }
+
+  test(WriteAheadLogReader - sequentially reading data) {
+// Write data manually for testing the sequential reader
+val file = File.createTempFile(TestSequentialReads, , 
tempDirectory)
+val writtenData = generateRandomData()
+writeDataManually(writtenData, file)
+val reader = new WriteAheadLogReader(file:/// + file.toString, 
hadoopConf)
+val readData = reader.toSeq.map(byteBufferToString)
+assert(readData.toList === writtenData.toList)
+assert(reader.hasNext === false)
+intercept[Exception] {
+  reader.next()
+}
+reader.close()
+  }
+
+  test(WriteAheadLogReader - sequentially reading data written with 
writer) {
+// Write data manually for testing the sequential reader
+val file = new File(tempDirectory, TestWriter)
+val dataToWrite = generateRandomData()
+writeDataUsingWriter(file, dataToWrite)
+val iter = dataToWrite.iterator
+val reader = new WriteAheadLogReader(file:/// + file.toString, 
hadoopConf)
+reader.foreach { byteBuffer =
+  assert(byteBufferToString(byteBuffer) === iter.next())
+}
+reader.close()
+  }
+
+  test(WriteAheadLogRandomReader - reading data using random reader) {
+// Write data manually for testing the random reader
+val file = File.createTempFile(TestRandomReads, , tempDirectory)
+val writtenData = generateRandomData()
+val segments = writeDataManually(writtenData, file)
+
+// Get a random order of these segments and read them back
+val writtenDataAndSegments = 

[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19198019
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.implicitConversions
+import scala.language.postfixOps
+import scala.util.Random
+
+import WriteAheadLogSuite._
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+/**
+ * This testsuite tests all classes related to write ahead logs.
+ */
+class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
+
+  val hadoopConf = new Configuration()
+  var tempDirectory: File = null
+
+  before {
+tempDirectory = Files.createTempDir()
+  }
+
+  after {
+if (tempDirectory != null  tempDirectory.exists()) {
+  FileUtils.deleteDirectory(tempDirectory)
+  tempDirectory = null
+}
+  }
+
+  test(WriteAheadLogWriter - writing data) {
+val file = new File(tempDirectory, Random.nextString(10))
+val dataToWrite = generateRandomData()
+val writer = new WriteAheadLogWriter(file:/// + file, hadoopConf)
+val segments = dataToWrite.map(data = writer.write(data))
+writer.close()
+val writtenData = readDataManually(file, segments)
+assert(writtenData.toArray === dataToWrite.toArray)
+  }
+
+  test(WriteAheadLogWriter - syncing of data by writing and reading 
immediately) {
+val file = new File(tempDirectory, Random.nextString(10))
+val dataToWrite = generateRandomData()
+val writer = new WriteAheadLogWriter(file:/// + file, hadoopConf)
+dataToWrite.foreach { data =
+  val segment = writer.write(data)
+  assert(readDataManually(file, Seq(segment)).head === data)
+}
+writer.close()
+  }
+
+  test(WriteAheadLogReader - sequentially reading data) {
+// Write data manually for testing the sequential reader
+val file = File.createTempFile(TestSequentialReads, , 
tempDirectory)
+val writtenData = generateRandomData()
+writeDataManually(writtenData, file)
+val reader = new WriteAheadLogReader(file:/// + file.toString, 
hadoopConf)
+val readData = reader.toSeq.map(byteBufferToString)
+assert(readData.toList === writtenData.toList)
+assert(reader.hasNext === false)
+intercept[Exception] {
+  reader.next()
+}
+reader.close()
+  }
+
+  test(WriteAheadLogReader - sequentially reading data written with 
writer) {
+// Write data manually for testing the sequential reader
+val file = new File(tempDirectory, TestWriter)
+val dataToWrite = generateRandomData()
+writeDataUsingWriter(file, dataToWrite)
+val iter = dataToWrite.iterator
+val reader = new WriteAheadLogReader(file:/// + file.toString, 
hadoopConf)
+reader.foreach { byteBuffer =
+  assert(byteBufferToString(byteBuffer) === iter.next())
+}
+reader.close()
+  }
+
+  test(WriteAheadLogRandomReader - reading data using random reader) {
+// Write data manually for testing the random reader
+val file = File.createTempFile(TestRandomReads, , tempDirectory)
+val writtenData = generateRandomData()
+val segments = writeDataManually(writtenData, file)
+
+// Get a random order of these segments and read them back
+val writtenDataAndSegments = 

[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60047058
  
Yes, the deletion is completely application specific and it is left for the 
application of the WAL to decide when to cleanup. 

In case of Spark Streaming, the scheduler of Spark Streaming intelligently 
keeps track of which batch is completely done, so any associated data + 
metadata can be cleaned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19241867
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
+  private val threadpoolName = sWriteAheadLogManager $callerNameTag
+  implicit private val executionContext = 
ExecutionContext.fromExecutorService(
+Utils.newDaemonFixedThreadPool(1, threadpoolName))
+  override protected val logName = sWriteAheadLogManager $callerNameTag
+
+  private var currentLogPath: String = null
+  private var currentLogWriter: WriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /** Write a byte buffer to the log file */
+  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
+var fileSegment: FileSegment = null
+var failures = 0
+var lastException: Exception = null
+var succeeded = false
+while (!succeeded  failures  maxFailures) {
+  try {
+fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+succeeded = true
+  } catch {
+case ex: Exception =
+  lastException = ex
+  logWarning(Failed to write to write ahead log)
+  resetWriter()
+  failures += 1
+  }
+}
+if (fileSegment == null) {
+  logError(sFailed to write to write ahead log after $failures 
failures)
+  throw lastException
+}
+fileSegment
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * Note that this is typically called when the caller is initializing 
and wants
+   * to recover past  state from the write ahead logs (that is, before 
making any writes).
+   * If this is called after writes have been made using this manager, 
then it may not return
+   * the latest the records. This does not deal with 

[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19241965
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
+  private val threadpoolName = sWriteAheadLogManager $callerNameTag
+  implicit private val executionContext = 
ExecutionContext.fromExecutorService(
+Utils.newDaemonFixedThreadPool(1, threadpoolName))
+  override protected val logName = sWriteAheadLogManager $callerNameTag
+
+  private var currentLogPath: String = null
+  private var currentLogWriter: WriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /** Write a byte buffer to the log file */
+  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
+var fileSegment: FileSegment = null
+var failures = 0
+var lastException: Exception = null
+var succeeded = false
+while (!succeeded  failures  maxFailures) {
+  try {
+fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+succeeded = true
+  } catch {
+case ex: Exception =
+  lastException = ex
+  logWarning(Failed to write to write ahead log)
+  resetWriter()
+  failures += 1
+  }
+}
+if (fileSegment == null) {
+  logError(sFailed to write to write ahead log after $failures 
failures)
+  throw lastException
+}
+fileSegment
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * Note that this is typically called when the caller is initializing 
and wants
+   * to recover past  state from the write ahead logs (that is, before 
making any writes).
+   * If this is called after writes have been made using this manager, 
then it may not return
+   * the latest the records. This does not deal with 

[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19242178
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.implicitConversions
+import scala.language.postfixOps
+import scala.util.Random
+
+import WriteAheadLogSuite._
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+/**
+ * This testsuite tests all classes related to write ahead logs.
+ */
+class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
+
+  val hadoopConf = new Configuration()
+  var tempDirectory: File = null
+
+  before {
+tempDirectory = Files.createTempDir()
+  }
+
+  after {
+if (tempDirectory != null  tempDirectory.exists()) {
+  FileUtils.deleteDirectory(tempDirectory)
+  tempDirectory = null
+}
+  }
+
+  test(WriteAheadLogWriter - writing data) {
+val file = new File(tempDirectory, Random.nextString(10))
+val dataToWrite = generateRandomData()
+val writer = new WriteAheadLogWriter(file:/// + file, hadoopConf)
+val segments = dataToWrite.map(data = writer.write(data))
+writer.close()
+val writtenData = readDataManually(file, segments)
+assert(writtenData.toArray === dataToWrite.toArray)
+  }
+
+  test(WriteAheadLogWriter - syncing of data by writing and reading 
immediately) {
+val file = new File(tempDirectory, Random.nextString(10))
+val dataToWrite = generateRandomData()
+val writer = new WriteAheadLogWriter(file:/// + file, hadoopConf)
+dataToWrite.foreach { data =
+  val segment = writer.write(data)
+  assert(readDataManually(file, Seq(segment)).head === data)
+}
+writer.close()
+  }
+
+  test(WriteAheadLogReader - sequentially reading data) {
+// Write data manually for testing the sequential reader
+val file = File.createTempFile(TestSequentialReads, , 
tempDirectory)
+val writtenData = generateRandomData()
+writeDataManually(writtenData, file)
+val reader = new WriteAheadLogReader(file:/// + file.toString, 
hadoopConf)
+val readData = reader.toSeq.map(byteBufferToString)
+assert(readData.toList === writtenData.toList)
+assert(reader.hasNext === false)
+intercept[Exception] {
+  reader.next()
+}
+reader.close()
+  }
+
+  test(WriteAheadLogReader - sequentially reading data written with 
writer) {
+// Write data manually for testing the sequential reader
+val file = new File(tempDirectory, TestWriter)
+val dataToWrite = generateRandomData()
+writeDataUsingWriter(file, dataToWrite)
+val iter = dataToWrite.iterator
+val reader = new WriteAheadLogReader(file:/// + file.toString, 
hadoopConf)
+reader.foreach { byteBuffer =
+  assert(byteBufferToString(byteBuffer) === iter.next())
+}
+reader.close()
+  }
+
+  test(WriteAheadLogRandomReader - reading data using random reader) {
+// Write data manually for testing the random reader
+val file = File.createTempFile(TestRandomReads, , tempDirectory)
+val writtenData = generateRandomData()
+val segments = writeDataManually(writtenData, file)
+
+// Get a random order of these segments and read them back
+val writtenDataAndSegments = 

[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60167866
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22046/consoleFull)
 for   PR 2882 at commit 
[`ef8db09`](https://github.com/apache/spark/commit/ef8db09075ab6d7e29a9e988bb83af16f3c553ca).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60171305
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22046/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60171298
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22046/consoleFull)
 for   PR 2882 at commit 
[`ef8db09`](https://github.com/apache/spark/commit/ef8db09075ab6d7e29a9e988bb83af16f3c553ca).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class LogInfo(startTime: Long, endTime: Long, path: String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60177673
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22049/consoleFull)
 for   PR 2882 at commit 
[`eb356ca`](https://github.com/apache/spark/commit/eb356caa2bfc2ea580f656c979ce372cffe91195).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60180202
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22049/consoleFull)
 for   PR 2882 at commit 
[`eb356ca`](https://github.com/apache/spark/commit/eb356caa2bfc2ea580f656c979ce372cffe91195).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class LogInfo(startTime: Long, endTime: Long, path: String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60180203
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22049/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60180749
  
Any idea why MiniDFSCluster can't be resolved via sbt, via it works fine 
via mvn?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60181254
  
I am running the test suite locally with sbt, and getting this!
```
[error] Uncaught exception when running 
org.apache.spark.streaming.util.WriteAheadLogSuite: 
java.lang.NoClassDefFoundError: org/mortbay/thread/ThreadPool
sbt.ForkMain$ForkError: org/mortbay/thread/ThreadPool
at 
org.apache.hadoop.hdfs.server.namenode.NameNode$1.run(NameNode.java:358)
at 
org.apache.hadoop.hdfs.server.namenode.NameNode$1.run(NameNode.java:353)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at 
org.apache.hadoop.hdfs.server.namenode.NameNode.startHttpServer(NameNode.java:353)
at 
org.apache.hadoop.hdfs.server.namenode.NameNode.initialize(NameNode.java:305)
at 
org.apache.hadoop.hdfs.server.namenode.NameNode.init(NameNode.java:496)
at 
org.apache.hadoop.hdfs.server.namenode.NameNode.createNameNode(NameNode.java:1279)
at org.apache.hadoop.hdfs.MiniDFSCluster.init(MiniDFSCluster.java:277)
at org.apache.hadoop.hdfs.MiniDFSCluster.init(MiniDFSCluster.java:124)
at 
org.apache.spark.streaming.util.WriteAheadLogSuite.init(WriteAheadLogSuite.scala:46)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:621)
at sbt.ForkMain$Run$2.call(ForkMain.java:294)
at sbt.ForkMain$Run$2.call(ForkMain.java:284)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: sbt.ForkMain$ForkError: org.mortbay.thread.ThreadPool
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at 
org.apache.hadoop.hdfs.server.namenode.NameNode$1.run(NameNode.java:358)
at 
org.apache.hadoop.hdfs.server.namenode.NameNode$1.run(NameNode.java:353)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at 
org.apache.hadoop.hdfs.server.namenode.NameNode.startHttpServer(NameNode.java:353)
at 
org.apache.hadoop.hdfs.server.namenode.NameNode.initialize(NameNode.java:305)
at 
org.apache.hadoop.hdfs.server.namenode.NameNode.init(NameNode.java:496)
at 
org.apache.hadoop.hdfs.server.namenode.NameNode.createNameNode(NameNode.java:1279)
at org.apache.hadoop.hdfs.MiniDFSCluster.init(MiniDFSCluster.java:277)
at org.apache.hadoop.hdfs.MiniDFSCluster.init(MiniDFSCluster.java:124)
at 
org.apache.spark.streaming.util.WriteAheadLogSuite.init(WriteAheadLogSuite.scala:46)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:621)
at sbt.ForkMain$Run$2.call(ForkMain.java:294)
at sbt.ForkMain$Run$2.call(ForkMain.java:284)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this 

[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60181713
  
I am getting the same error as Jenkins on sbt. Looks like sbt somehow is 
not adding the Minicluster jar to classpath for the tests, though it runs fine 
in mvn and intellij!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60183139
  
Have you done a sbt/sbt clean? I did it. 
Either ways, there is a real problem. The hdfs-minicluster depends on 
hdfs-core stuff, a lot of whose dependencies have been excluded in the 
core/pom.xml. That's because we dont want HDFS versions specific dependencies 
to interfere with Spark build, This is okay since we dont need to run HDFS, 
just interface with it. But in this case, since we need to run HDFS as a 
minicluster, those exclusions are a problem. 

There are probably two possible solutions
1. Based on maven profile, reinclude the excluded stuff in test and run the 
mini-cluster. This is going to painfully complicated.
2.  Not use mini-cluster, and rather have two different code paths for 
local file system and hdfs. More complex code, but simpler to maintain, though 
the unit test will not really test HDFS flushes correctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60183932
  
1 is not an issue at all. mvn will exclude any dependencies whose versions 
are hard-coded. So in any profile, minicluster will pull in the correct 
versions, and we don't bundle it since we don't actually package it. Exclusions 
don't come in the way - which is why maven build works fine (see Flume for 
example - we build against any arbitrary HDFS version and use minicluster in 
our tests without issues. There are other projects which build against 
arbitrary HDFS versions and still use the minicluster).

I don't like (2) - we made test fixes which we didn't see in the local 
tests. I'd rather keep it.

This definitely seems like sbt-mvn resolver related issue. Since the tests 
are fine on mvn, it looks like new top-level dependencies are somehow not 
getting pulled in. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60184378
  
Even after sbt clean I am getting the same error:
```[error] 
/Users/hshreedharan/work/spark-mvn/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala:34:
 object MiniDFSCluster is not a member of package org.apache.hadoop.hdfs
[error] import org.apache.hadoop.hdfs.MiniDFSCluster
[error]^
[error] 
/Users/hshreedharan/work/spark-mvn/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala:46:
 not found: type MiniDFSCluster
[error]   val cluster = new MiniDFSCluster(new Configuration, 2, true, null)
[error] ^
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[error] two errors found```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60184841
  
Which profile are you running with ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-22 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60185449
  
I have the env vars set to use hadoop-2.4


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-59993367
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22006/consoleFull)
 for   PR 2882 at commit 
[`4ab602a`](https://github.com/apache/spark/commit/4ab602a0074a2144d33367229358c19d079798d8).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60004014
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22006/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2882#issuecomment-60004008
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22006/consoleFull)
 for   PR 2882 at commit 
[`4ab602a`](https://github.com/apache/spark/commit/4ab602a0074a2144d33367229358c19d079798d8).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class LogInfo(startTime: Long, endTime: Long, path: String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19184821
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io._
+import java.net.URI
+import java.nio.ByteBuffer
+
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
+
+/**
+ * A writer for writing byte-buffers to a write ahead log file.
+ */
+private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: 
Configuration)
+  extends Closeable {
+  private val underlyingStream: Either[DataOutputStream, 
FSDataOutputStream] = {
--- End diff --

WIP: this file is going to be updated by @harishreedharan to get rid of the 
local file customizations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19190251
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala ---
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+private[streaming] case class FileSegment (path: String, offset: Long, 
length: Int)
--- End diff --

Seems we also have the same case class `FileSegment` in core/storage 
module. can we just use that one, or we have to build our own?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19190537
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala ---
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+private[streaming] case class FileSegment (path: String, offset: Long, 
length: Int)
--- End diff --

Good point. But that FileSegment is for local files and uses java's File to 
store the file name. So can;t use that. And I did not want to change anything 
in core because of streaming.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19190926
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala ---
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+private[streaming] case class FileSegment (path: String, offset: Long, 
length: Int)
--- End diff --

So maybe we can change to another name, I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19190985
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala ---
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+private[streaming] case class FileSegment (path: String, offset: Long, 
length: Int)
--- End diff --

How about `WriteAheadLogFileSegment`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19191068
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala ---
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+private[streaming] case class FileSegment (path: String, offset: Long, 
length: Int)
--- End diff --

Yeah, that would be fine :).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19193639
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
+  private val threadpoolName = sWriteAheadLogManager $callerNameTag
+  implicit private val executionContext = 
ExecutionContext.fromExecutorService(
+Utils.newDaemonFixedThreadPool(1, threadpoolName))
+  override protected val logName = sWriteAheadLogManager $callerNameTag
+
+  private var currentLogPath: String = null
+  private var currentLogWriter: WriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /** Write a byte buffer to the log file */
+  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
+var fileSegment: FileSegment = null
+var failures = 0
+var lastException: Exception = null
+var succeeded = false
--- End diff --

I think that `succeeded` implies `fileSegment != null` and vice-versa, so 
you probably don't need this variable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19193668
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
+  private val threadpoolName = sWriteAheadLogManager $callerNameTag
+  implicit private val executionContext = 
ExecutionContext.fromExecutorService(
+Utils.newDaemonFixedThreadPool(1, threadpoolName))
+  override protected val logName = sWriteAheadLogManager $callerNameTag
+
+  private var currentLogPath: String = null
+  private var currentLogWriter: WriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /** Write a byte buffer to the log file */
+  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
+var fileSegment: FileSegment = null
+var failures = 0
+var lastException: Exception = null
+var succeeded = false
+while (!succeeded  failures  maxFailures) {
+  try {
+fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+succeeded = true
+  } catch {
+case ex: Exception =
+  lastException = ex
+  logWarning(Failed to write to write ahead log)
+  resetWriter()
+  failures += 1
+  }
+}
+if (fileSegment == null) {
+  logError(sFailed to write to write ahead log after $failures 
failures)
+  throw lastException
+}
+fileSegment
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * Note that this is typically called when the caller is initializing 
and wants
+   * to recover past  state from the write ahead logs (that is, before 
making any writes).
--- End diff --

Extra space after past.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as 

[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19193702
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
+  private val threadpoolName = sWriteAheadLogManager $callerNameTag
+  implicit private val executionContext = 
ExecutionContext.fromExecutorService(
+Utils.newDaemonFixedThreadPool(1, threadpoolName))
+  override protected val logName = sWriteAheadLogManager $callerNameTag
+
+  private var currentLogPath: String = null
+  private var currentLogWriter: WriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /** Write a byte buffer to the log file */
+  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
+var fileSegment: FileSegment = null
+var failures = 0
+var lastException: Exception = null
+var succeeded = false
+while (!succeeded  failures  maxFailures) {
+  try {
+fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+succeeded = true
+  } catch {
+case ex: Exception =
+  lastException = ex
+  logWarning(Failed to write to write ahead log)
+  resetWriter()
+  failures += 1
+  }
+}
+if (fileSegment == null) {
+  logError(sFailed to write to write ahead log after $failures 
failures)
+  throw lastException
+}
+fileSegment
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * Note that this is typically called when the caller is initializing 
and wants
+   * to recover past  state from the write ahead logs (that is, before 
making any writes).
+   * If this is called after writes have been made using this manager, 
then it may not return
+   * the latest the records. This does not deal with 

[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19193688
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
+  private val threadpoolName = sWriteAheadLogManager $callerNameTag
+  implicit private val executionContext = 
ExecutionContext.fromExecutorService(
+Utils.newDaemonFixedThreadPool(1, threadpoolName))
+  override protected val logName = sWriteAheadLogManager $callerNameTag
+
+  private var currentLogPath: String = null
+  private var currentLogWriter: WriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /** Write a byte buffer to the log file */
+  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
+var fileSegment: FileSegment = null
+var failures = 0
+var lastException: Exception = null
+var succeeded = false
+while (!succeeded  failures  maxFailures) {
+  try {
+fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+succeeded = true
+  } catch {
+case ex: Exception =
+  lastException = ex
+  logWarning(Failed to write to write ahead log)
+  resetWriter()
+  failures += 1
+  }
+}
+if (fileSegment == null) {
+  logError(sFailed to write to write ahead log after $failures 
failures)
+  throw lastException
+}
+fileSegment
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * Note that this is typically called when the caller is initializing 
and wants
+   * to recover past  state from the write ahead logs (that is, before 
making any writes).
+   * If this is called after writes have been made using this manager, 
then it may not return
+   * the latest the records. This does not deal with 

[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19193795
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
+  private val threadpoolName = sWriteAheadLogManager $callerNameTag
+  implicit private val executionContext = 
ExecutionContext.fromExecutorService(
+Utils.newDaemonFixedThreadPool(1, threadpoolName))
+  override protected val logName = sWriteAheadLogManager $callerNameTag
+
+  private var currentLogPath: String = null
+  private var currentLogWriter: WriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /** Write a byte buffer to the log file */
+  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
+var fileSegment: FileSegment = null
+var failures = 0
+var lastException: Exception = null
+var succeeded = false
+while (!succeeded  failures  maxFailures) {
+  try {
+fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+succeeded = true
+  } catch {
+case ex: Exception =
+  lastException = ex
+  logWarning(Failed to write to write ahead log)
+  resetWriter()
+  failures += 1
+  }
+}
+if (fileSegment == null) {
+  logError(sFailed to write to write ahead log after $failures 
failures)
+  throw lastException
+}
+fileSegment
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * Note that this is typically called when the caller is initializing 
and wants
+   * to recover past  state from the write ahead logs (that is, before 
making any writes).
+   * If this is called after writes have been made using this manager, 
then it may not return
+   * the latest the records. This does not deal with 

[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19193790
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
+  private val threadpoolName = sWriteAheadLogManager $callerNameTag
+  implicit private val executionContext = 
ExecutionContext.fromExecutorService(
+Utils.newDaemonFixedThreadPool(1, threadpoolName))
+  override protected val logName = sWriteAheadLogManager $callerNameTag
+
+  private var currentLogPath: String = null
+  private var currentLogWriter: WriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /** Write a byte buffer to the log file */
+  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
+var fileSegment: FileSegment = null
+var failures = 0
+var lastException: Exception = null
+var succeeded = false
+while (!succeeded  failures  maxFailures) {
+  try {
+fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+succeeded = true
+  } catch {
+case ex: Exception =
+  lastException = ex
+  logWarning(Failed to write to write ahead log)
+  resetWriter()
+  failures += 1
+  }
+}
+if (fileSegment == null) {
+  logError(sFailed to write to write ahead log after $failures 
failures)
+  throw lastException
+}
+fileSegment
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * Note that this is typically called when the caller is initializing 
and wants
+   * to recover past  state from the write ahead logs (that is, before 
making any writes).
+   * If this is called after writes have been made using this manager, 
then it may not return
+   * the latest the records. This does not deal with 

[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19193881
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.{Closeable, EOFException}
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.Logging
+
+/**
+ * A reader for reading write ahead log files written using
+ * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. This reads
+ * the records (bytebuffers) in the log file sequentially and return them 
as an
+ * iterator of bytebuffers.
+ */
+private[streaming] class WriteAheadLogReader(path: String, conf: 
Configuration)
+  extends Iterator[ByteBuffer] with Closeable with Logging {
+
+  private val instream = HdfsUtils.getInputStream(path, conf)
+  private var closed = false
+  private var nextItem: Option[ByteBuffer] = None
+
+  override def hasNext: Boolean = synchronized {
+if (closed) {
+  return false
+}
+
+if (nextItem.isDefined) { // handle the case where hasNext is called 
without calling next
+  true
+} else {
+  try {
+val length = instream.readInt()
+val buffer = new Array[Byte](length)
+instream.readFully(buffer)
+nextItem = Some(ByteBuffer.wrap(buffer))
+logTrace(Read next item  + nextItem.get)
+true
+  } catch {
+case e: EOFException =
+  logDebug(Error reading next item, EOF reached, e)
+  close()
+  false
+case e: Exception =
+  logDebug(Error reading next item, EOF reached, e)
--- End diff --

This error message should probably be different in order to distinguish it 
from the EOFException case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19194043
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io._
+import java.net.URI
+import java.nio.ByteBuffer
+
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
+
+/**
+ * A writer for writing byte-buffers to a write ahead log file.
+ */
+private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: 
Configuration)
+  extends Closeable {
+  private val underlyingStream: Either[DataOutputStream, 
FSDataOutputStream] = {
--- End diff --

Ah, that makes sense.  I guess you can still use the HDFS API to write to 
local files for testing purposes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19195094
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io._
+import java.net.URI
+import java.nio.ByteBuffer
+
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
+
+/**
+ * A writer for writing byte-buffers to a write ahead log file.
+ */
+private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: 
Configuration)
+  extends Closeable {
+  private val underlyingStream: Either[DataOutputStream, 
FSDataOutputStream] = {
+val uri = new URI(path)
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == file
+
+if ((isDefaultLocal  uri.getScheme == null) || uri.getScheme == 
file) {
+  assert(!new File(uri.getPath).exists)
+  Left(new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(uri.getPath
+} else {
+  Right(HdfsUtils.getOutputStream(path, hadoopConf))
+}
+  }
+
+  private lazy val hadoopFlushMethod = {
+val cls = classOf[FSDataOutputStream]
+
Try(cls.getMethod(hflush)).orElse(Try(cls.getMethod(sync))).toOption
--- End diff --

Nice Scala one-liner :)

Why do we need this reflection, though?  Is this necessary to support 
multiple Hadoop versions?  If so, could you add a one-line comment to explain 
this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19195209
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala ---
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+private[streaming] case class FileSegment (path: String, offset: Long, 
length: Int)
--- End diff --

Will make the change in the next update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19195244
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
+  private val threadpoolName = sWriteAheadLogManager $callerNameTag
+  implicit private val executionContext = 
ExecutionContext.fromExecutorService(
+Utils.newDaemonFixedThreadPool(1, threadpoolName))
+  override protected val logName = sWriteAheadLogManager $callerNameTag
+
+  private var currentLogPath: String = null
+  private var currentLogWriter: WriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /** Write a byte buffer to the log file */
--- End diff --

It might be nice to explicitly state that this write is synchronous / 
blocking, if that's the case.  If I call this and it returns, am I guaranteed 
that my log record is durably stored in HDFS and readable during recovery?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19195251
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.{Closeable, EOFException}
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.Logging
+
+/**
+ * A reader for reading write ahead log files written using
+ * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. This reads
+ * the records (bytebuffers) in the log file sequentially and return them 
as an
+ * iterator of bytebuffers.
+ */
+private[streaming] class WriteAheadLogReader(path: String, conf: 
Configuration)
+  extends Iterator[ByteBuffer] with Closeable with Logging {
+
+  private val instream = HdfsUtils.getInputStream(path, conf)
+  private var closed = false
+  private var nextItem: Option[ByteBuffer] = None
+
+  override def hasNext: Boolean = synchronized {
+if (closed) {
+  return false
+}
+
+if (nextItem.isDefined) { // handle the case where hasNext is called 
without calling next
+  true
+} else {
+  try {
+val length = instream.readInt()
+val buffer = new Array[Byte](length)
+instream.readFully(buffer)
+nextItem = Some(ByteBuffer.wrap(buffer))
+logTrace(Read next item  + nextItem.get)
+true
+  } catch {
+case e: EOFException =
+  logDebug(Error reading next item, EOF reached, e)
+  close()
+  false
+case e: Exception =
+  logDebug(Error reading next item, EOF reached, e)
--- End diff --

Will fix in the next iter


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19195267
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io._
+import java.net.URI
+import java.nio.ByteBuffer
+
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
+
+/**
+ * A writer for writing byte-buffers to a write ahead log file.
+ */
+private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: 
Configuration)
+  extends Closeable {
+  private val underlyingStream: Either[DataOutputStream, 
FSDataOutputStream] = {
--- End diff --

Yep. And for all tests, we are just going to use Hadoop Minicluster anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19195316
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io._
+import java.net.URI
+import java.nio.ByteBuffer
+
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
+
+/**
+ * A writer for writing byte-buffers to a write ahead log file.
+ */
+private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: 
Configuration)
+  extends Closeable {
+  private val underlyingStream: Either[DataOutputStream, 
FSDataOutputStream] = {
+val uri = new URI(path)
+val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+val isDefaultLocal = defaultFs == null || defaultFs == file
+
+if ((isDefaultLocal  uri.getScheme == null) || uri.getScheme == 
file) {
+  assert(!new File(uri.getPath).exists)
+  Left(new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(uri.getPath
+} else {
+  Right(HdfsUtils.getOutputStream(path, hadoopConf))
+}
+  }
+
+  private lazy val hadoopFlushMethod = {
+val cls = classOf[FSDataOutputStream]
+
Try(cls.getMethod(hflush)).orElse(Try(cls.getMethod(sync))).toOption
--- End diff --

Actually we do, since Spark supports Hadoop 1 to Hadoop 2.5.0 right now. In 
Hadoop 1.x, the sync method did the same thing hflush does in 2.5.0 - so in 
short we do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19195339
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.implicitConversions
+import scala.language.postfixOps
+import scala.util.Random
+
+import WriteAheadLogSuite._
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+/**
+ * This testsuite tests all classes related to write ahead logs.
+ */
+class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
+
+  val hadoopConf = new Configuration()
+  var tempDirectory: File = null
+
+  before {
+tempDirectory = Files.createTempDir()
+  }
+
+  after {
+if (tempDirectory != null  tempDirectory.exists()) {
+  FileUtils.deleteDirectory(tempDirectory)
+  tempDirectory = null
+}
+  }
+
+  test(WriteAheadLogWriter - writing data) {
+val file = new File(tempDirectory, Random.nextString(10))
--- End diff --

Minor nitpick, but it looks like the tests use a few different techniques 
for creating files in the temp directory:

- `val file = new File(tempDirectory, Random.nextString(10))`
- `val file = File.createTempFile(TestSequentialReads, , tempDirectory)`
- `val file = new File(tempDirectory, TestWriter)`

For consistency, we should pick one method.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19195364
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon 
failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ *@param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will 
be rolled over.
+ *Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every 
attempt to write to log.
+ *Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+logDirectory: String,
+hadoopConf: Configuration,
+rollingIntervalSecs: Int = 60,
+maxFailures: Int = 3,
+callerName: String = ,
+clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+if (callerName != null  callerName.nonEmpty) s for $callerName 
else 
+  private val threadpoolName = sWriteAheadLogManager $callerNameTag
+  implicit private val executionContext = 
ExecutionContext.fromExecutorService(
+Utils.newDaemonFixedThreadPool(1, threadpoolName))
+  override protected val logName = sWriteAheadLogManager $callerNameTag
+
+  private var currentLogPath: String = null
+  private var currentLogWriter: WriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /** Write a byte buffer to the log file */
--- End diff --

Yes, that is right. I believe we are lacking Scaladocs everywhere - we 
should do it I guess.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19195386
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.implicitConversions
+import scala.language.postfixOps
+import scala.util.Random
+
+import WriteAheadLogSuite._
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+/**
+ * This testsuite tests all classes related to write ahead logs.
+ */
+class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
+
+  val hadoopConf = new Configuration()
+  var tempDirectory: File = null
+
+  before {
+tempDirectory = Files.createTempDir()
+  }
+
+  after {
+if (tempDirectory != null  tempDirectory.exists()) {
+  FileUtils.deleteDirectory(tempDirectory)
+  tempDirectory = null
+}
+  }
+
+  test(WriteAheadLogWriter - writing data) {
+val file = new File(tempDirectory, Random.nextString(10))
--- End diff --

I am refactoring much of the tests right now. So we won't use local files 
at all - so we should be ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19195389
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.implicitConversions
+import scala.language.postfixOps
+import scala.util.Random
+
+import WriteAheadLogSuite._
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+/**
+ * This testsuite tests all classes related to write ahead logs.
+ */
+class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
+
+  val hadoopConf = new Configuration()
+  var tempDirectory: File = null
+
+  before {
+tempDirectory = Files.createTempDir()
+  }
+
+  after {
+if (tempDirectory != null  tempDirectory.exists()) {
+  FileUtils.deleteDirectory(tempDirectory)
+  tempDirectory = null
+}
+  }
+
+  test(WriteAheadLogWriter - writing data) {
+val file = new File(tempDirectory, Random.nextString(10))
+val dataToWrite = generateRandomData()
+val writer = new WriteAheadLogWriter(file:/// + file, hadoopConf)
+val segments = dataToWrite.map(data = writer.write(data))
+writer.close()
+val writtenData = readDataManually(file, segments)
+assert(writtenData.toArray === dataToWrite.toArray)
+  }
+
+  test(WriteAheadLogWriter - syncing of data by writing and reading 
immediately) {
+val file = new File(tempDirectory, Random.nextString(10))
+val dataToWrite = generateRandomData()
+val writer = new WriteAheadLogWriter(file:/// + file, hadoopConf)
+dataToWrite.foreach { data =
+  val segment = writer.write(data)
+  assert(readDataManually(file, Seq(segment)).head === data)
+}
+writer.close()
+  }
+
+  test(WriteAheadLogReader - sequentially reading data) {
+// Write data manually for testing the sequential reader
+val file = File.createTempFile(TestSequentialReads, , 
tempDirectory)
+val writtenData = generateRandomData()
+writeDataManually(writtenData, file)
+val reader = new WriteAheadLogReader(file:/// + file.toString, 
hadoopConf)
+val readData = reader.toSeq.map(byteBufferToString)
+assert(readData.toList === writtenData.toList)
+assert(reader.hasNext === false)
+intercept[Exception] {
+  reader.next()
+}
+reader.close()
+  }
+
+  test(WriteAheadLogReader - sequentially reading data written with 
writer) {
+// Write data manually for testing the sequential reader
+val file = new File(tempDirectory, TestWriter)
+val dataToWrite = generateRandomData()
+writeDataUsingWriter(file, dataToWrite)
+val iter = dataToWrite.iterator
+val reader = new WriteAheadLogReader(file:/// + file.toString, 
hadoopConf)
+reader.foreach { byteBuffer =
+  assert(byteBufferToString(byteBuffer) === iter.next())
+}
+reader.close()
+  }
+
+  test(WriteAheadLogRandomReader - reading data using random reader) {
+// Write data manually for testing the random reader
+val file = File.createTempFile(TestRandomReads, , tempDirectory)
+val writtenData = generateRandomData()
+val segments = writeDataManually(writtenData, file)
+
+// Get a random order of these segments and read them back
+val writtenDataAndSegments = 

[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19195483
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}
+
+private[streaming] object HdfsUtils {
+
+  def getOutputStream(path: String, conf: Configuration): 
FSDataOutputStream = {
+// HDFS is not thread-safe when getFileSystem is called, so 
synchronize on that
+
+val dfsPath = new Path(path)
+val dfs =
--- End diff --

Minor style nit but the extra newline here looks strange.  I'd put the 
`this.synchronized` on this line, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19195548
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}
+
+private[streaming] object HdfsUtils {
+
+  def getOutputStream(path: String, conf: Configuration): 
FSDataOutputStream = {
+// HDFS is not thread-safe when getFileSystem is called, so 
synchronize on that
+
+val dfsPath = new Path(path)
+val dfs =
+  this.synchronized {
+dfsPath.getFileSystem(conf)
+  }
+// If the file exists and we have append support, append instead of 
creating a new file
+val stream: FSDataOutputStream = {
+  if (dfs.isFile(dfsPath)) {
+if (conf.getBoolean(hdfs.append.support, false)) {
+  dfs.append(dfsPath)
+} else {
+  throw new IllegalStateException(File exists and there is no 
append support!)
+}
+  } else {
+dfs.create(dfsPath)
+  }
+}
+stream
+  }
+
+  def getInputStream(path: String, conf: Configuration): FSDataInputStream 
= {
+val dfsPath = new Path(path)
+val dfs = this.synchronized {
+  dfsPath.getFileSystem(conf)
+}
+val instream = dfs.open(dfsPath)
+instream
+  }
+
+  def checkState(state: Boolean, errorMsg: = String) {
+if(!state) {
--- End diff --

Space after the `if`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2882#discussion_r19195551
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}
+
+private[streaming] object HdfsUtils {
+
+  def getOutputStream(path: String, conf: Configuration): 
FSDataOutputStream = {
+// HDFS is not thread-safe when getFileSystem is called, so 
synchronize on that
+
+val dfsPath = new Path(path)
+val dfs =
+  this.synchronized {
+dfsPath.getFileSystem(conf)
+  }
+// If the file exists and we have append support, append instead of 
creating a new file
+val stream: FSDataOutputStream = {
+  if (dfs.isFile(dfsPath)) {
+if (conf.getBoolean(hdfs.append.support, false)) {
+  dfs.append(dfsPath)
+} else {
+  throw new IllegalStateException(File exists and there is no 
append support!)
+}
+  } else {
+dfs.create(dfsPath)
+  }
+}
+stream
+  }
+
+  def getInputStream(path: String, conf: Configuration): FSDataInputStream 
= {
+val dfsPath = new Path(path)
+val dfs = this.synchronized {
+  dfsPath.getFileSystem(conf)
+}
+val instream = dfs.open(dfsPath)
+instream
+  }
+
+  def checkState(state: Boolean, errorMsg: = String) {
+if(!state) {
+  throw new IllegalStateException(errorMsg)
+}
+  }
+
+  def getBlockLocations(path: String, conf: Configuration): 
Option[Array[String]] = {
+val dfsPath = new Path(path)
+val dfs =
--- End diff --

Same here; I'd move the next line onto this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org