[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18200173
  
--- Diff: bin/pyspark ---
@@ -87,11 +87,7 @@ export PYSPARK_SUBMIT_ARGS
 if [[ -n $SPARK_TESTING ]]; then
   unset YARN_CONF_DIR
   unset HADOOP_CONF_DIR
-  if [[ -n $PYSPARK_DOC_TEST ]]; then
-exec $PYSPARK_PYTHON -m doctest $1
-  else
-exec $PYSPARK_PYTHON $1
-  fi
+  exec $PYSPARK_PYTHON $1
--- End diff --

If that is easy to do, then that is a better idea. Since this PR is already 
so big, lets change as little of the existing infrastructure as possible. 
Otherwise if existing pyspark breaks in some weird way, it will be hard to 
revert this commit, without reverting all of pysparkstreaming.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18200181
  
--- Diff: python/pyspark/accumulators.py ---
@@ -256,3 +256,8 @@ def _start_update_server():
 thread.daemon = True
 thread.start()
 return server
+
+
+if __name__ == __main__:
--- End diff --

Same logic as I mentioned earlier. If it is easy undo the refactoring and 
be able to run tests, lets try to do that. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18200194
  
--- Diff: python/pyspark/serializers.py ---
@@ -114,6 +114,9 @@ def __ne__(self, other):
 def __repr__(self):
 return %s object % self.__class__.__name__
 
+def __hash__(self):
+return hash(str(self))
--- End diff --

Similar question: are the changes in this file necessary for streaming or 
was part of the refactoring?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18200248
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +791,110 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+peerFetchLock.synchronized {
+  val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 
* 1000) // milliseconds
--- End diff --

I didnt want to pollute the namespace inside the BlockManager class any 
more than absolutely necessary. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18200286
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -111,6 +112,9 @@ private[spark] class BlockManager(
 MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, 
conf)
   private val broadcastCleaner = new MetadataCleaner(
 MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
+  @volatile private var cachedPeers: Seq[BlockManagerId] = _
--- End diff --

Good idea. Should have done that myself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...

2014-09-30 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/2590#discussion_r18200306
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSqlParser.scala ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import scala.language.implicitConversions
+import scala.util.parsing.combinator.syntactical.StandardTokenParsers
+import scala.util.parsing.combinator.PackratParsers
+import scala.util.parsing.input.CharArrayReader.EofCh
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.SqlLexical
+import scala.util.parsing.combinator.lexical.StdLexical
+
+/**
+ * A simple Hive SQL pre parser. It parses the commands like cache,uncache 
etc and 
+ * remaining actual query will be parsed by HiveQl.parseSql 
+ */
--- End diff --

Since this class takes over all HiveQL parsing work (although it delegates 
to an underlying Hive parser), it's not accurate to call it a pre parser, 
maybe this:

 A parser that recognizes all HiveQL constructs together with several 
Spark SQL specific extensions like `CACHE TABLE` and `UNCACHE TABLE`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3412][SQL]add missing row api

2014-09-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2529#issuecomment-57270939
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21023/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...

2014-09-30 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/2590#discussion_r18200319
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSqlParser.scala ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import scala.language.implicitConversions
+import scala.util.parsing.combinator.syntactical.StandardTokenParsers
+import scala.util.parsing.combinator.PackratParsers
+import scala.util.parsing.input.CharArrayReader.EofCh
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.SqlLexical
+import scala.util.parsing.combinator.lexical.StdLexical
+
+/**
+ * A simple Hive SQL pre parser. It parses the commands like cache,uncache 
etc and 
+ * remaining actual query will be parsed by HiveQl.parseSql 
+ */
+class HiveSqlParser extends StandardTokenParsers with PackratParsers {  
--- End diff --

Maybe `ExtendedHiveQLParser` is a better name? Since HiveQL is the 
definitive name and we're adding extensions to it in Spark SQL.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18200328
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +791,110 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+peerFetchLock.synchronized {
+  val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 
* 1000) // milliseconds
+  val timeout = System.currentTimeMillis - lastPeerFetchTime  
cachedPeersTtl
+  if (cachedPeers == null || forceFetch || timeout) {
+cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug(Fetched peers from master:  + cachedPeers.mkString([, 
,, ]))
+  }
+  cachedPeers
+}
+  }
+
+  /**
* Replicate block to another node.
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...

2014-09-30 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/2590#discussion_r18200330
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSqlParser.scala ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import scala.language.implicitConversions
+import scala.util.parsing.combinator.syntactical.StandardTokenParsers
+import scala.util.parsing.combinator.PackratParsers
+import scala.util.parsing.input.CharArrayReader.EofCh
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.SqlLexical
+import scala.util.parsing.combinator.lexical.StdLexical
+
+/**
+ * A simple Hive SQL pre parser. It parses the commands like cache,uncache 
etc and 
+ * remaining actual query will be parsed by HiveQl.parseSql 
+ */
+class HiveSqlParser extends StandardTokenParsers with PackratParsers {  
+  
+   def apply(input: String): LogicalPlan = {
+// Special-case out set commands since the value fields can be
+// complex to handle without RegexParsers. Also this approach
+// is clearer for the several possible cases of set commands.
+if (input.trim.toLowerCase.startsWith(set)) {
--- End diff --

I'd like to handle `SET` in parser combinator too, but we can leave this to 
another PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...

2014-09-30 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/2590#discussion_r18200397
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala ---
@@ -75,6 +75,9 @@ class LocalHiveContext(sc: SparkContext) extends 
HiveContext(sc) {
  */
 class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   self =
+
+  @transient
+  protected[sql] val hiveParser = new HiveSqlParser  
--- End diff --

Can we move this to `HiveQl`? It seems more reasonable to put all HiveQL 
parsing logic together.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...

2014-09-30 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2588#issuecomment-57271331
  
Isnt there a way to augment the existing tests to make sure that the state 
in the driver (blockmanagermaster) is cleared after removing tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3412][SQL]add missing row api

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2529#issuecomment-57271368
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/208/consoleFull)
 for   PR 2529 at commit 
[`4c18c29`](https://github.com/apache/spark/commit/4c18c29faedd52ca6a3d925ea039841b860862f7).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...

2014-09-30 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/2590#discussion_r18200443
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSqlParser.scala ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import scala.language.implicitConversions
+import scala.util.parsing.combinator.syntactical.StandardTokenParsers
+import scala.util.parsing.combinator.PackratParsers
+import scala.util.parsing.input.CharArrayReader.EofCh
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.SqlLexical
+import scala.util.parsing.combinator.lexical.StdLexical
+
+/**
+ * A simple Hive SQL pre parser. It parses the commands like cache,uncache 
etc and 
+ * remaining actual query will be parsed by HiveQl.parseSql 
+ */
+class HiveSqlParser extends StandardTokenParsers with PackratParsers {  
+  
+   def apply(input: String): LogicalPlan = {
+// Special-case out set commands since the value fields can be
+// complex to handle without RegexParsers. Also this approach
+// is clearer for the several possible cases of set commands.
+if (input.trim.toLowerCase.startsWith(set)) {
+  input.trim.drop(3).split(=, 2).map(_.trim) match {
+case Array() = // set
+  SetCommand(None, None)
+case Array(key) = // set key
+  SetCommand(Some(key), None)
+case Array(key, value) = // set key=value
+  SetCommand(Some(key), Some(value))
+  }
+} else if (input.trim.startsWith(!)) {
+  ShellCommand(input.drop(1))  
+} else {
+  phrase(query)(new lexical.Scanner(input)) match {
+case Success(r, x) = r
+case x = sys.error(x.toString)
+  }
+}
+  }
+   
+  protected case class Keyword(str: String)
+  
+  protected val CACHE = Keyword(CACHE)
+  protected val SET = Keyword(SET)
+  protected val ADD = Keyword(ADD)
+  protected val JAR = Keyword(JAR)
+  protected val TABLE = Keyword(TABLE)
+  protected val AS = Keyword(AS)
+  protected val UNCACHE = Keyword(UNCACHE)
+  protected val FILE = Keyword(FILE)
+  protected val DFS = Keyword(DFS)
+  protected val SOURCE = Keyword(SOURCE)
+  
+  protected implicit def asParser(k: Keyword): Parser[String] =
+lexical.allCaseVersions(k.str).map(x = x : Parser[String]).reduce(_ | 
_)
+
+  protected def allCaseConverse(k: String): Parser[String] =
+lexical.allCaseVersions(k).map(x = x : Parser[String]).reduce(_ | _)  
 
+  
+  protected val reservedWords =
+this.getClass
+  .getMethods
+  .filter(_.getReturnType == classOf[Keyword])
+  .map(_.invoke(this).asInstanceOf[Keyword].str)
+
+  override val lexical = new SqlLexical(reservedWords)
+  
+  protected lazy val query: Parser[LogicalPlan] = (
+cache | unCache | addJar | addFile | dfs | source | hiveQl
--- End diff --

Nit: I'd prefer `uncache` to `unCache`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...

2014-09-30 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/2588#issuecomment-57271603
  
Yes - can you submit one? I'm going to merge this because it has been 
blocking a lot of other patches.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...

2014-09-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/2588


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1830 Deploy failover, Make Persistence e...

2014-09-30 Thread ScrapCodes
Github user ScrapCodes commented on the pull request:

https://github.com/apache/spark/pull/771#issuecomment-57271768
  
Because they were private spark. It is very inconvenient for someone to 
write his/her own recovery mode with all that private spark. + This felt like 
developer facing API so putting those annotations seemed appropriate. What is 
you opinion on this ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3166]: Allow custom serialiser to be sh...

2014-09-30 Thread ypwais
Github user ypwais commented on the pull request:

https://github.com/apache/spark/pull/1890#issuecomment-57271841
  
Any chance this might make it into v1.2?  I'd love to use custom 
{Input,Output}Formats (e.g. Parquet) and I personally spent almost a day after 
getting bitten by this classloader issue (especially since it doesn't manifest 
in local mode).  And fixing serialization is nice too ^_^


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...

2014-09-30 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/2590#discussion_r18200542
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSqlParser.scala ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import scala.language.implicitConversions
+import scala.util.parsing.combinator.syntactical.StandardTokenParsers
+import scala.util.parsing.combinator.PackratParsers
+import scala.util.parsing.input.CharArrayReader.EofCh
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.SqlLexical
+import scala.util.parsing.combinator.lexical.StdLexical
+
+/**
+ * A simple Hive SQL pre parser. It parses the commands like cache,uncache 
etc and 
+ * remaining actual query will be parsed by HiveQl.parseSql 
+ */
+class HiveSqlParser extends StandardTokenParsers with PackratParsers {  
+  
+   def apply(input: String): LogicalPlan = {
+// Special-case out set commands since the value fields can be
+// complex to handle without RegexParsers. Also this approach
+// is clearer for the several possible cases of set commands.
+if (input.trim.toLowerCase.startsWith(set)) {
+  input.trim.drop(3).split(=, 2).map(_.trim) match {
+case Array() = // set
+  SetCommand(None, None)
+case Array(key) = // set key
+  SetCommand(Some(key), None)
+case Array(key, value) = // set key=value
+  SetCommand(Some(key), Some(value))
+  }
+} else if (input.trim.startsWith(!)) {
+  ShellCommand(input.drop(1))  
+} else {
+  phrase(query)(new lexical.Scanner(input)) match {
+case Success(r, x) = r
+case x = sys.error(x.toString)
+  }
+}
+  }
+   
+  protected case class Keyword(str: String)
+  
+  protected val CACHE = Keyword(CACHE)
+  protected val SET = Keyword(SET)
+  protected val ADD = Keyword(ADD)
+  protected val JAR = Keyword(JAR)
+  protected val TABLE = Keyword(TABLE)
+  protected val AS = Keyword(AS)
+  protected val UNCACHE = Keyword(UNCACHE)
+  protected val FILE = Keyword(FILE)
+  protected val DFS = Keyword(DFS)
+  protected val SOURCE = Keyword(SOURCE)
+  
+  protected implicit def asParser(k: Keyword): Parser[String] =
+lexical.allCaseVersions(k.str).map(x = x : Parser[String]).reduce(_ | 
_)
+
+  protected def allCaseConverse(k: String): Parser[String] =
+lexical.allCaseVersions(k).map(x = x : Parser[String]).reduce(_ | _)  
 
+  
+  protected val reservedWords =
+this.getClass
+  .getMethods
+  .filter(_.getReturnType == classOf[Keyword])
+  .map(_.invoke(this).asInstanceOf[Keyword].str)
+
+  override val lexical = new SqlLexical(reservedWords)
+  
+  protected lazy val query: Parser[LogicalPlan] = (
+cache | unCache | addJar | addFile | dfs | source | hiveQl
+  )  
+  
+  protected lazy val hiveQl: Parser[LogicalPlan] =
+remainingQuery ^^ { 
+  case r = HiveQl.parseSql(r.trim()) 
+}
+  
+  /** It returns all remaining query */
+  protected lazy val remainingQuery: Parser[String] = new Parser[String] {
+def apply(in:Input) = Success(in.source.subSequence(in.offset, 
in.source.length).toString,
--- End diff --

Nit: space after `:`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...

2014-09-30 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2588#issuecomment-57271851
  
Actually, I took a look, it does test that. So I am not sure how it was 
passing earlier some of the times. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...

2014-09-30 Thread rxin
GitHub user rxin opened a pull request:

https://github.com/apache/spark/pull/2591

[SPARK-3709] Executors don't always report broadcast block removal properly 
back to the driver (for branch-1.1)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rxin/spark SPARK-3709-1.1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2591.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2591


commit ab99cc0b3cbaa3f83c0feb40436cdf389905b658
Author: Reynold Xin r...@apache.org
Date:   2014-09-30T06:20:39Z

[SPARK-3709] Executors don't always report broadcast block removal properly 
back to the driver




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...

2014-09-30 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/2591#issuecomment-57271920
  
We should also cherrypick this into branch-1.0.

Master branch has been fixed in https://github.com/apache/spark/pull/2588


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...

2014-09-30 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/2588#issuecomment-57272107
  
It worked because askSlaves was true and the driver always queries the 
slaves in your afterUnpersist test. The problem is with regard to reporting, 
not whether the block itself has been dropped or not.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...

2014-09-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2588#issuecomment-57272188
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21022/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2588#issuecomment-57272186
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21022/consoleFull)
 for   PR 2588 at commit 
[`6dab2e3`](https://github.com/apache/spark/commit/6dab2e30b0f2f9e1cb38f4b0ed93fc57d65a978c).
 * This patch **fails** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2590#issuecomment-57272266
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/209/consoleFull)
 for   PR 2590 at commit 
[`ba26cd1`](https://github.com/apache/spark/commit/ba26cd1a5895fa71a028992b5c946bd8333dfdc3).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...

2014-09-30 Thread liancheng
Github user liancheng commented on the pull request:

https://github.com/apache/spark/pull/2590#issuecomment-57272298
  
Awesome! I just started working on this last weekend and you've already got 
done :) Left some minor comments. This generally LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...

2014-09-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2432#issuecomment-57272310
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21020/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2432#issuecomment-57272306
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21020/consoleFull)
 for   PR 2432 at commit 
[`f6af132`](https://github.com/apache/spark/commit/f6af132cbfba32d2e7434f6b027f1cd6188ea6f2).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `public class ApplicationId implements Serializable `
  * `  case class KillExecutor(`
  * `  case class MasterChangeAcknowledged(appId: ApplicationId)`
  * `  case class RegisteredApplication(appId: ApplicationId, masterUrl: 
String) extends DeployMessage`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...

2014-09-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2591#issuecomment-57272501
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21024/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3713][SQL] Uses JSON to serialize DataT...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2563#issuecomment-57272666
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/210/consoleFull)
 for   PR 2563 at commit 
[`03da3ec`](https://github.com/apache/spark/commit/03da3ec870940bd6ff56e03450993da6125b40a4).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2330#issuecomment-57272680
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21025/consoleFull)
 for   PR 2330 at commit 
[`0dae310`](https://github.com/apache/spark/commit/0dae31022fa26abc806db94e02fa7c15a031d1c1).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18200872
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.python
+
+import java.util.{ArrayList = JArrayList, List = JList}
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.api.java._
+import org.apache.spark.api.python._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Interval, Duration, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.api.java._
+
+
+/**
+ * Interface for Python callback function with three arguments
+ */
+trait PythonRDDFunction {
+  def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]]
+}
+
+/**
+ * Wrapper for PythonRDDFunction
+ */
+private[python] class RDDFunction(pfunc: PythonRDDFunction)
--- End diff --

I think I get it. You need a simple trait / interface (`PythonRDDFunction`) 
that is used as the interface for creating python function objects through 
py4j. And then you need to convert it to a `Function2` object (using 
`RDDFunction` class) so that it can be passed on to `DStream.foreachRDD`. 
However, I am not sure this warrants a separate class with a confusing name. 
How about static conversions added to `object PythonRDDFunction` like 
```
object PythonRDDFunction {
   implicit def toFunction2(): Option[Function2[]] = {
   }
}
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3734] DriverRunner should not read SPAR...

2014-09-30 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/2586#issuecomment-57272762
  
LGTM. Merging into master and 1.1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3412][SQL]add missing row api

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2529#issuecomment-57272803
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/207/consoleFull)
 for   PR 2529 at commit 
[`4c18c29`](https://github.com/apache/spark/commit/4c18c29faedd52ca6a3d925ea039841b860862f7).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3734] DriverRunner should not read SPAR...

2014-09-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/2586


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18200937
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.python
+
+import java.util.{ArrayList = JArrayList, List = JList}
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.api.java._
+import org.apache.spark.api.python._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Interval, Duration, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.api.java._
+
+
+/**
+ * Interface for Python callback function with three arguments
+ */
+trait PythonRDDFunction {
--- End diff --

The name is very non-intuitive. This is just an interface for defining 
functions that can be used in `Dstream.transform`. A more intuitive name (at 
least to me) is `DStreamTransformFunction`. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...

2014-09-30 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2432#discussion_r18200976
  
--- Diff: core/src/main/java/org/apache/spark/ApplicationId.java ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+
+/**
+ * This class represents unique application id for identifying each 
application
+ */
+public class ApplicationId implements Serializable {
+
+  private String appId;
+
+  public ApplicationId(String appId) {
+this.appId = appId;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+if (!(other instanceof ApplicationId)) {
+  return false;
+} else if (other == this) {
+  return true;
+} else if (appId != null) {
+  return appId.equals(((ApplicationId)other).appId);
+} else {
+  return false;
+}
+  }
+
+  @Override
+  public int hashCode() {
+return appId.hashCode();
+  }
+
+  @Override
+  public String toString() {
+return appId;
+  }
+
+
+  private static ApplicationId defaultAppId =
+new ApplicationId(String.valueOf(System.currentTimeMillis()));
--- End diff --

O.K. I've modified that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18201011
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.python
+
+import java.util.{ArrayList = JArrayList, List = JList}
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.api.java._
+import org.apache.spark.api.python._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Interval, Duration, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.api.java._
+
+
+/**
+ * Interface for Python callback function with three arguments
+ */
+trait PythonRDDFunction {
+  def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]]
+}
+
+/**
+ * Wrapper for PythonRDDFunction
+ */
+private[python] class RDDFunction(pfunc: PythonRDDFunction)
+  extends function.Function2[JList[JavaRDD[_]], Time, 
JavaRDD[Array[Byte]]] with Serializable {
+
+  def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = {
--- End diff --

Since they are not used outside `RDDFunction`, they should at least be 
private to this class. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18201085
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.python
+
+import java.util.{ArrayList = JArrayList, List = JList}
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.api.java._
+import org.apache.spark.api.python._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Interval, Duration, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.api.java._
+
+
+/**
+ * Interface for Python callback function with three arguments
+ */
+trait PythonRDDFunction {
+  def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]]
+}
+
+/**
+ * Wrapper for PythonRDDFunction
+ */
+private[python] class RDDFunction(pfunc: PythonRDDFunction)
+  extends function.Function2[JList[JavaRDD[_]], Time, 
JavaRDD[Array[Byte]]] with Serializable {
+
+  def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = {
+if (rdd.isDefined) {
+  JavaRDD.fromRDD(rdd.get)
+} else {
+  null
+}
+  }
+
+  def some(jrdd: JavaRDD[Array[Byte]]): Option[RDD[Array[Byte]]] = {
+if (jrdd != null) {
+  Some(jrdd.rdd)
+} else {
+  None
+}
+  }
+
+  def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
+some(pfunc.call(time.milliseconds, List(wrapRDD(rdd)).asJava))
+  }
+
+  def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): 
Option[RDD[Array[Byte]]] = {
+some(pfunc.call(time.milliseconds, List(wrapRDD(rdd), 
wrapRDD(rdd2)).asJava))
+  }
+
+  // for JFunction2
+  def call(rdds: JList[JavaRDD[_]], time: Time): JavaRDD[Array[Byte]] = {
+pfunc.call(time.milliseconds, rdds)
+  }
+}
+
+private[python]
+abstract class PythonDStream(parent: DStream[_]) extends 
DStream[Array[Byte]] (parent.ssc) {
+
+  override def dependencies = List(parent)
+
+  override def slideDuration: Duration = parent.slideDuration
+
+  val asJavaDStream  = JavaDStream.fromDStream(this)
+}
+
+private[spark] object PythonDStream {
--- End diff --

Why `private[spark]` and not `private[python]` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2706][SQL] Enable Spark to support Hive...

2014-09-30 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/2241#issuecomment-57273157
  
Hey @zhzhan I've published a modified version of Hive 0.13 that we can link 
against. A few benefits is:

1. I fixed the hive-exec jar so it only contains hive packages and not a 
bunch of other code.
2. I'm using a shaded version of the protobuf dependency (otherwise, this 
intefers with protobuf found in older hadoop client versions).


https://oss.sonatype.org/content/repositories/orgspark-project-1077/org/spark-project/hive/

For now you'll need to add this repo to the build to use it, but if it all 
works I can just fully publish this to maven central.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18201096
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.python
+
+import java.util.{ArrayList = JArrayList, List = JList}
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.api.java._
+import org.apache.spark.api.python._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Interval, Duration, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.api.java._
+
+
+/**
+ * Interface for Python callback function with three arguments
+ */
+trait PythonRDDFunction {
+  def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]]
+}
+
+/**
+ * Wrapper for PythonRDDFunction
+ */
+private[python] class RDDFunction(pfunc: PythonRDDFunction)
+  extends function.Function2[JList[JavaRDD[_]], Time, 
JavaRDD[Array[Byte]]] with Serializable {
+
+  def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = {
+if (rdd.isDefined) {
+  JavaRDD.fromRDD(rdd.get)
+} else {
+  null
+}
+  }
+
+  def some(jrdd: JavaRDD[Array[Byte]]): Option[RDD[Array[Byte]]] = {
+if (jrdd != null) {
+  Some(jrdd.rdd)
+} else {
+  None
+}
+  }
+
+  def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
+some(pfunc.call(time.milliseconds, List(wrapRDD(rdd)).asJava))
+  }
+
+  def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): 
Option[RDD[Array[Byte]]] = {
+some(pfunc.call(time.milliseconds, List(wrapRDD(rdd), 
wrapRDD(rdd2)).asJava))
+  }
+
+  // for JFunction2
+  def call(rdds: JList[JavaRDD[_]], time: Time): JavaRDD[Array[Byte]] = {
+pfunc.call(time.milliseconds, rdds)
+  }
+}
+
+private[python]
+abstract class PythonDStream(parent: DStream[_]) extends 
DStream[Array[Byte]] (parent.ssc) {
+
+  override def dependencies = List(parent)
+
+  override def slideDuration: Duration = parent.slideDuration
+
+  val asJavaDStream  = JavaDStream.fromDStream(this)
+}
+
+private[spark] object PythonDStream {
+
+  // helper function for DStream.foreachRDD(),
+  // cannot be `foreachRDD`, it will confusing py4j
+  def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pyfunc: 
PythonRDDFunction){
+val func = new RDDFunction(pyfunc)
+jdstream.dstream.foreachRDD((rdd, time) = func(Some(rdd), time))
+  }
+
+  // helper function for ssc.transform()
+  def callTransform(ssc: JavaStreamingContext, jdsteams: 
JList[JavaDStream[_]],
+pyfunc: PythonRDDFunction)
+:JavaDStream[Array[Byte]] = {
+val func = new RDDFunction(pyfunc)
+ssc.transform(jdsteams, func)
+  }
+
+  // convert list of RDD into queue of RDDs, for ssc.queueStream()
+  def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]]): 
java.util.Queue[JavaRDD[Array[Byte]]] = {
+val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]]
+rdds.forall(queue.add(_))
+queue
+  }
+}
+
+/**
+ * Transformed DStream in Python.
+ *
+ * If the result RDD is PythonRDD, then it will cache it as an template 
for future use,
+ * this can reduce the Python callbacks.
+ */
+private[spark]
--- End diff --

Why `private[spark]` and not `private[python]`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r1820
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.python
+
+import java.util.{ArrayList = JArrayList, List = JList}
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.api.java._
+import org.apache.spark.api.python._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Interval, Duration, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.api.java._
+
+
+/**
+ * Interface for Python callback function with three arguments
+ */
+trait PythonRDDFunction {
+  def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]]
+}
+
+/**
+ * Wrapper for PythonRDDFunction
+ */
+private[python] class RDDFunction(pfunc: PythonRDDFunction)
+  extends function.Function2[JList[JavaRDD[_]], Time, 
JavaRDD[Array[Byte]]] with Serializable {
+
+  def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = {
+if (rdd.isDefined) {
+  JavaRDD.fromRDD(rdd.get)
+} else {
+  null
+}
+  }
+
+  def some(jrdd: JavaRDD[Array[Byte]]): Option[RDD[Array[Byte]]] = {
+if (jrdd != null) {
+  Some(jrdd.rdd)
+} else {
+  None
+}
+  }
+
+  def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
+some(pfunc.call(time.milliseconds, List(wrapRDD(rdd)).asJava))
+  }
+
+  def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): 
Option[RDD[Array[Byte]]] = {
+some(pfunc.call(time.milliseconds, List(wrapRDD(rdd), 
wrapRDD(rdd2)).asJava))
+  }
+
+  // for JFunction2
+  def call(rdds: JList[JavaRDD[_]], time: Time): JavaRDD[Array[Byte]] = {
+pfunc.call(time.milliseconds, rdds)
+  }
+}
+
+private[python]
+abstract class PythonDStream(parent: DStream[_]) extends 
DStream[Array[Byte]] (parent.ssc) {
+
+  override def dependencies = List(parent)
+
+  override def slideDuration: Duration = parent.slideDuration
+
+  val asJavaDStream  = JavaDStream.fromDStream(this)
+}
+
+private[spark] object PythonDStream {
+
+  // helper function for DStream.foreachRDD(),
+  // cannot be `foreachRDD`, it will confusing py4j
+  def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pyfunc: 
PythonRDDFunction){
+val func = new RDDFunction(pyfunc)
+jdstream.dstream.foreachRDD((rdd, time) = func(Some(rdd), time))
+  }
+
+  // helper function for ssc.transform()
+  def callTransform(ssc: JavaStreamingContext, jdsteams: 
JList[JavaDStream[_]],
+pyfunc: PythonRDDFunction)
+:JavaDStream[Array[Byte]] = {
+val func = new RDDFunction(pyfunc)
+ssc.transform(jdsteams, func)
+  }
+
+  // convert list of RDD into queue of RDDs, for ssc.queueStream()
+  def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]]): 
java.util.Queue[JavaRDD[Array[Byte]]] = {
+val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]]
+rdds.forall(queue.add(_))
+queue
+  }
+}
+
+/**
+ * Transformed DStream in Python.
+ *
+ * If the result RDD is PythonRDD, then it will cache it as an template 
for future use,
+ * this can reduce the Python callbacks.
+ */
+private[spark]
+class PythonTransformedDStream (parent: DStream[_], pfunc: 
PythonRDDFunction,
+var reuse: Boolean = false)
+  extends PythonDStream(parent) {
+
+  val func = new RDDFunction(pfunc)
+  var lastResult: PythonRDD = _
+
+  override def compute(validTime: Time): Option[RDD[Array[Byte]]] = 

[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18201102
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.python
+
+import java.util.{ArrayList = JArrayList, List = JList}
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.api.java._
+import org.apache.spark.api.python._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Interval, Duration, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.api.java._
+
+
+/**
+ * Interface for Python callback function with three arguments
+ */
+trait PythonRDDFunction {
+  def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]]
+}
+
+/**
+ * Wrapper for PythonRDDFunction
+ */
+private[python] class RDDFunction(pfunc: PythonRDDFunction)
+  extends function.Function2[JList[JavaRDD[_]], Time, 
JavaRDD[Array[Byte]]] with Serializable {
+
+  def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = {
+if (rdd.isDefined) {
+  JavaRDD.fromRDD(rdd.get)
+} else {
+  null
+}
+  }
+
+  def some(jrdd: JavaRDD[Array[Byte]]): Option[RDD[Array[Byte]]] = {
+if (jrdd != null) {
+  Some(jrdd.rdd)
+} else {
+  None
+}
+  }
+
+  def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
+some(pfunc.call(time.milliseconds, List(wrapRDD(rdd)).asJava))
+  }
+
+  def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): 
Option[RDD[Array[Byte]]] = {
+some(pfunc.call(time.milliseconds, List(wrapRDD(rdd), 
wrapRDD(rdd2)).asJava))
+  }
+
+  // for JFunction2
+  def call(rdds: JList[JavaRDD[_]], time: Time): JavaRDD[Array[Byte]] = {
+pfunc.call(time.milliseconds, rdds)
+  }
+}
+
+private[python]
+abstract class PythonDStream(parent: DStream[_]) extends 
DStream[Array[Byte]] (parent.ssc) {
+
+  override def dependencies = List(parent)
+
+  override def slideDuration: Duration = parent.slideDuration
+
+  val asJavaDStream  = JavaDStream.fromDStream(this)
+}
+
+private[spark] object PythonDStream {
+
+  // helper function for DStream.foreachRDD(),
+  // cannot be `foreachRDD`, it will confusing py4j
+  def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pyfunc: 
PythonRDDFunction){
+val func = new RDDFunction(pyfunc)
+jdstream.dstream.foreachRDD((rdd, time) = func(Some(rdd), time))
+  }
+
+  // helper function for ssc.transform()
+  def callTransform(ssc: JavaStreamingContext, jdsteams: 
JList[JavaDStream[_]],
+pyfunc: PythonRDDFunction)
+:JavaDStream[Array[Byte]] = {
+val func = new RDDFunction(pyfunc)
+ssc.transform(jdsteams, func)
+  }
+
+  // convert list of RDD into queue of RDDs, for ssc.queueStream()
+  def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]]): 
java.util.Queue[JavaRDD[Array[Byte]]] = {
+val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]]
+rdds.forall(queue.add(_))
+queue
+  }
+}
+
+/**
+ * Transformed DStream in Python.
+ *
+ * If the result RDD is PythonRDD, then it will cache it as an template 
for future use,
+ * this can reduce the Python callbacks.
+ */
+private[spark]
+class PythonTransformedDStream (parent: DStream[_], pfunc: 
PythonRDDFunction,
+var reuse: Boolean = false)
+  extends PythonDStream(parent) {
+
+  val func = new RDDFunction(pfunc)
+  var lastResult: PythonRDD = _
+
+  override def compute(validTime: Time): Option[RDD[Array[Byte]]] = 

[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18201113
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.python
+
+import java.util.{ArrayList = JArrayList, List = JList}
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.api.java._
+import org.apache.spark.api.python._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Interval, Duration, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.api.java._
+
+
+/**
+ * Interface for Python callback function with three arguments
+ */
+trait PythonRDDFunction {
+  def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]]
+}
+
+/**
+ * Wrapper for PythonRDDFunction
+ */
+private[python] class RDDFunction(pfunc: PythonRDDFunction)
+  extends function.Function2[JList[JavaRDD[_]], Time, 
JavaRDD[Array[Byte]]] with Serializable {
+
+  def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = {
+if (rdd.isDefined) {
+  JavaRDD.fromRDD(rdd.get)
+} else {
+  null
+}
+  }
+
+  def some(jrdd: JavaRDD[Array[Byte]]): Option[RDD[Array[Byte]]] = {
+if (jrdd != null) {
+  Some(jrdd.rdd)
+} else {
+  None
+}
+  }
+
+  def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
+some(pfunc.call(time.milliseconds, List(wrapRDD(rdd)).asJava))
+  }
+
+  def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): 
Option[RDD[Array[Byte]]] = {
+some(pfunc.call(time.milliseconds, List(wrapRDD(rdd), 
wrapRDD(rdd2)).asJava))
+  }
+
+  // for JFunction2
+  def call(rdds: JList[JavaRDD[_]], time: Time): JavaRDD[Array[Byte]] = {
+pfunc.call(time.milliseconds, rdds)
+  }
+}
+
+private[python]
+abstract class PythonDStream(parent: DStream[_]) extends 
DStream[Array[Byte]] (parent.ssc) {
+
+  override def dependencies = List(parent)
+
+  override def slideDuration: Duration = parent.slideDuration
+
+  val asJavaDStream  = JavaDStream.fromDStream(this)
+}
+
+private[spark] object PythonDStream {
+
+  // helper function for DStream.foreachRDD(),
+  // cannot be `foreachRDD`, it will confusing py4j
+  def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pyfunc: 
PythonRDDFunction){
+val func = new RDDFunction(pyfunc)
+jdstream.dstream.foreachRDD((rdd, time) = func(Some(rdd), time))
+  }
+
+  // helper function for ssc.transform()
+  def callTransform(ssc: JavaStreamingContext, jdsteams: 
JList[JavaDStream[_]],
+pyfunc: PythonRDDFunction)
+:JavaDStream[Array[Byte]] = {
+val func = new RDDFunction(pyfunc)
+ssc.transform(jdsteams, func)
+  }
+
+  // convert list of RDD into queue of RDDs, for ssc.queueStream()
+  def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]]): 
java.util.Queue[JavaRDD[Array[Byte]]] = {
+val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]]
+rdds.forall(queue.add(_))
+queue
+  }
+}
+
+/**
+ * Transformed DStream in Python.
+ *
+ * If the result RDD is PythonRDD, then it will cache it as an template 
for future use,
+ * this can reduce the Python callbacks.
+ */
+private[spark]
+class PythonTransformedDStream (parent: DStream[_], pfunc: 
PythonRDDFunction,
+var reuse: Boolean = false)
+  extends PythonDStream(parent) {
+
+  val func = new RDDFunction(pfunc)
+  var lastResult: PythonRDD = _
+
+  override def compute(validTime: Time): Option[RDD[Array[Byte]]] = 

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18201177
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +791,110 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+peerFetchLock.synchronized {
+  val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 
* 1000) // milliseconds
+  val timeout = System.currentTimeMillis - lastPeerFetchTime  
cachedPeersTtl
+  if (cachedPeers == null || forceFetch || timeout) {
+cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug(Fetched peers from master:  + cachedPeers.mkString([, 
,, ]))
+  }
+  cachedPeers
+}
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt(spark.storage.maxReplicationFailures, 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersForReplication = new ArrayBuffer[BlockManagerId]
--- End diff --

Why 3? This will be as large as `cluster size - 1`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-30 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18201208
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +791,110 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+peerFetchLock.synchronized {
+  val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 
* 1000) // milliseconds
+  val timeout = System.currentTimeMillis - lastPeerFetchTime  
cachedPeersTtl
+  if (cachedPeers == null || forceFetch || timeout) {
+cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug(Fetched peers from master:  + cachedPeers.mkString([, 
,, ]))
+  }
+  cachedPeers
+}
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt(spark.storage.maxReplicationFailures, 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersForReplication = new ArrayBuffer[BlockManagerId]
--- End diff --

nvm ignore that comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18201230
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +791,110 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+peerFetchLock.synchronized {
+  val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 
* 1000) // milliseconds
+  val timeout = System.currentTimeMillis - lastPeerFetchTime  
cachedPeersTtl
+  if (cachedPeers == null || forceFetch || timeout) {
+cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug(Fetched peers from master:  + cachedPeers.mkString([, 
,, ]))
+  }
+  cachedPeers
+}
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt(spark.storage.maxReplicationFailures, 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersForReplication = new ArrayBuffer[BlockManagerId]
+val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
+val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var replicationFailed = false
+var failures = 0
+var done = false
+
+// Get cached list of peers
+peersForReplication ++= getPeers(forceFetch = false)
+
+// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
+// So assuming the list of peers does not change and no replication 
failures,
+// if there are multiple attempts in the same node to replicate the 
same block,
+// the same set of peers will be selected.
+def getRandomPeer(): Option[BlockManagerId] = {
+  // If replication had failed, then force update the cached list of 
peers and remove the peers
+  // that have been already used
+  if (replicationFailed) {
+peersForReplication.clear()
+peersForReplication ++= getPeers(forceFetch = true)
+peersForReplication --= peersReplicatedTo
+peersForReplication --= peersFailedToReplicateTo
+  }
+  if (!peersForReplication.isEmpty) {
+Some(peersForReplication(random.nextInt(peersForReplication.size)))
+  } else {
+None
+  }
 }
-for (peer: BlockManagerId - cachedPeers) {
-  val start = System.nanoTime
-  data.rewind()
-  logDebug(sTry to replicate $blockId once; The size of the data is 
${data.limit()} Bytes.  +
-sTo node: $peer)
 
-  try {
-blockTransferService.uploadBlockSync(
-  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
-  } catch {
-case e: Exception =
-  logError(sFailed to replicate block to $peer, e)
+// One by one choose a random peer and try uploading the block to it
+// If replication fails (e.g., target peer is down), force the list of 
cached peers
+// to be re-fetched from driver and then pick another random peer for 
replication. Also
+// temporarily black list the peer for which replication failed.
+//
+// This selection of a peer and replication is continued in a loop 
until one of the
+// following 3 conditions is fulfilled:
+// (i) specified number of peers have been replicated to
+// (ii) too many failures in replicating to peers
+// (iii) no peer left to replicate to
+//
+while (!done) {
+  getRandomPeer() match {
+case Some(peer) =
+  try {
+val onePeerStartTime = System.currentTimeMillis
+data.rewind()
+logTrace(sTrying to replicate $blockId of ${data.limit()} 
bytes to $peer)
+blockTransferService.uploadBlockSync(
+  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
+logTrace(sReplicated $blockId of ${data.limit()} bytes to 
$peer in %f ms
+  .format((System.currentTimeMillis - onePeerStartTime) / 1e3))
+

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-30 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18201286
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +791,110 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+peerFetchLock.synchronized {
+  val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 
* 1000) // milliseconds
+  val timeout = System.currentTimeMillis - lastPeerFetchTime  
cachedPeersTtl
+  if (cachedPeers == null || forceFetch || timeout) {
+cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug(Fetched peers from master:  + cachedPeers.mkString([, 
,, ]))
+  }
+  cachedPeers
+}
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt(spark.storage.maxReplicationFailures, 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersForReplication = new ArrayBuffer[BlockManagerId]
+val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
+val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var replicationFailed = false
+var failures = 0
+var done = false
+
+// Get cached list of peers
+peersForReplication ++= getPeers(forceFetch = false)
+
+// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
+// So assuming the list of peers does not change and no replication 
failures,
+// if there are multiple attempts in the same node to replicate the 
same block,
+// the same set of peers will be selected.
+def getRandomPeer(): Option[BlockManagerId] = {
+  // If replication had failed, then force update the cached list of 
peers and remove the peers
+  // that have been already used
+  if (replicationFailed) {
+peersForReplication.clear()
+peersForReplication ++= getPeers(forceFetch = true)
+peersForReplication --= peersReplicatedTo
+peersForReplication --= peersFailedToReplicateTo
+  }
+  if (!peersForReplication.isEmpty) {
+Some(peersForReplication(random.nextInt(peersForReplication.size)))
+  } else {
+None
+  }
 }
-for (peer: BlockManagerId - cachedPeers) {
-  val start = System.nanoTime
-  data.rewind()
-  logDebug(sTry to replicate $blockId once; The size of the data is 
${data.limit()} Bytes.  +
-sTo node: $peer)
 
-  try {
-blockTransferService.uploadBlockSync(
-  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
-  } catch {
-case e: Exception =
-  logError(sFailed to replicate block to $peer, e)
+// One by one choose a random peer and try uploading the block to it
+// If replication fails (e.g., target peer is down), force the list of 
cached peers
+// to be re-fetched from driver and then pick another random peer for 
replication. Also
+// temporarily black list the peer for which replication failed.
+//
+// This selection of a peer and replication is continued in a loop 
until one of the
+// following 3 conditions is fulfilled:
+// (i) specified number of peers have been replicated to
+// (ii) too many failures in replicating to peers
+// (iii) no peer left to replicate to
+//
+while (!done) {
+  getRandomPeer() match {
+case Some(peer) =
+  try {
+val onePeerStartTime = System.currentTimeMillis
+data.rewind()
+logTrace(sTrying to replicate $blockId of ${data.limit()} 
bytes to $peer)
+blockTransferService.uploadBlockSync(
+  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
+logTrace(sReplicated $blockId of ${data.limit()} bytes to 
$peer in %f ms
+  .format((System.currentTimeMillis - onePeerStartTime) / 1e3))
+

[GitHub] spark pull request: [SPARK-3613] Record only average block size in...

2014-09-30 Thread Ishiihara
Github user Ishiihara commented on the pull request:

https://github.com/apache/spark/pull/2470#issuecomment-57274044
  
@rxin I looked through Roaring bitmap and that is a highly compressed 
bitmap compared with other bitmap implementations. I will start working on this 
and keep you updated with progress and issues coming up during implementation. 
Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SQL] Prevents per row dynamic dispatching and...

2014-09-30 Thread liancheng
GitHub user liancheng opened a pull request:

https://github.com/apache/spark/pull/2592

[SQL] Prevents per row dynamic dispatching and pattern matching when 
inserting Hive values

Builds all wrappers at first according to object inspector types to avoid 
per row costs.

TODO:

- [ ] Micro benchmark

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/liancheng/spark hive-value-wrapper

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2592.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2592


commit 499866606d079c30bae50d799bbe1ba1488ec343
Author: Cheng Lian lian.cs@gmail.com
Date:   2014-09-30T06:55:09Z

Prevents per row dynamic dispatching and pattern matching when inserting 
Hive values




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3412][SQL]add missing row api

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2529#issuecomment-57274900
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/208/consoleFull)
 for   PR 2529 at commit 
[`4c18c29`](https://github.com/apache/spark/commit/4c18c29faedd52ca6a3d925ea039841b860862f7).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SQL] Prevents per row dynamic dispatching and...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2592#issuecomment-57274920
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21026/consoleFull)
 for   PR 2592 at commit 
[`4998666`](https://github.com/apache/spark/commit/499866606d079c30bae50d799bbe1ba1488ec343).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18201664
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
@@ -413,7 +413,7 @@ class StreamingContext private[streaming] (
   dstreams: Seq[DStream[_]],
   transformFunc: (Seq[RDD[_]], Time) = RDD[T]
 ): DStream[T] = {
-new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
+new TransformedDStream[T](dstreams, transformFunc)
--- End diff --

Without this change, ssc.transform() will failed to serialize 
transformFunc, because the callback function from python is not serializable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2590#issuecomment-57276077
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/209/consoleFull)
 for   PR 2590 at commit 
[`ba26cd1`](https://github.com/apache/spark/commit/ba26cd1a5895fa71a028992b5c946bd8333dfdc3).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...

2014-09-30 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2432#discussion_r18202042
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala ---
@@ -34,9 +36,11 @@ private[spark] trait SchedulerBackend {
 
   /**
* The application ID associated with the job, if any.
+   * It is expected that the subclasses of TaskScheduler or 
SchedulerBackend
+   * override this method and return an unique application ID.
*
-   * @return The application ID, or None if the backend does not provide 
an ID.
+   * @return The application ID, if the backend does not provide an ID.
*/
-  def applicationId(): Option[String] = None
+  def applicationId() = new 
ApplicationId(System.currentTimeMillis.toString)
--- End diff --

I've checked Mesos' code and I found following things.

* FrameworkID is assigned by MesosMaster for each process which execute 
SchedulerDriver(for Mesos, MesosSchedulerDriver), and registration. Instances 
of MesosSchedulerDriver are contained in 
CoarseMesosSchedulerBackend/MesosSchedulerBackend.

* CoarseMesosSchedulerBackend/MesosSchedulerBackend#start is invoked, 
MesosMaster registers a framework to a registered-frameworks-list.

* When CoraseMesosSchedulerBackend/MesosSchedulerBackend#stop is invoked, 
Master unregisters a framework from a registered-frameworks-list.

So, I think we can use FrameworkID as an unique Application ID.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2432#issuecomment-57276520
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21027/consoleFull)
 for   PR 2432 at commit 
[`42bea55`](https://github.com/apache/spark/commit/42bea55e3a4b665f8b3a6f9a423a2a199ea55093).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2330#issuecomment-57276579
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21025/consoleFull)
 for   PR 2330 at commit 
[`0dae310`](https://github.com/apache/spark/commit/0dae31022fa26abc806db94e02fa7c15a031d1c1).
 * This patch **fails** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

2014-09-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2330#issuecomment-57276584
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21025/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3677] [BUILD] [YARN] Scalastyle is neve...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2520#issuecomment-57276930
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21028/consoleFull)
 for   PR 2520 at commit 
[`641b9e7`](https://github.com/apache/spark/commit/641b9e704d72b4111b1fb8662c523e31559980d8).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2591#issuecomment-57277304
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/212/consoleFull)
 for   PR 2591 at commit 
[`ab99cc0`](https://github.com/apache/spark/commit/ab99cc0b3cbaa3f83c0feb40436cdf389905b658).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-09-30 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18202539
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -43,9 +46,34 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
 throw new NotImplementedError(clone() is not implemented.)
 }
 
+private [spark]
+object RandomSampler {
+  // Default random number generator used by random samplers
+  def rngDefault: Random = new XORShiftRandom
+
+  // Default gap sampling maximum
+  // For sampling fractions = this value, the gap sampling optimization 
will be applied.
+  // Above this value, it is assumed that tradtional bernoulli sampling 
is faster.  The
+  // optimal value for this will depend on the RNG.  More expensive RNGs 
will tend to make
+  // the optimal value higher.  The most reliable way to determine this 
value for a given RNG
+  // is to experiment.  I would expect a value of 0.5 to be close in most 
cases.
+  def gsmDefault: Double = 0.4
+
+  // Default gap sampling epsilon
+  // When sampling random floating point values the gap sampling logic 
requires value  0.  An
+  // optimal value for this parameter is at or near the minimum positive 
floating point value
+  // returned by nextDouble() for the RNG being used.
+  def epsDefault: Double = 5e-11
--- End diff --

Yeah I meant add .0 for clarity, then subtract `Double` for being overkill. 
I think this would be more consistent with Scala/Spark style that way, but at 
least I'd argue for the .0. Trivial here; more of a minor question for the 
whole code base.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SQL] Prevents per row dynamic dispatching and...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2592#issuecomment-57277565
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21026/consoleFull)
 for   PR 2592 at commit 
[`4998666`](https://github.com/apache/spark/commit/499866606d079c30bae50d799bbe1ba1488ec343).
 * This patch **fails** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SQL] Prevents per row dynamic dispatching and...

2014-09-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2592#issuecomment-57277568
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21026/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-09-30 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18202776
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +81,237 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  // epsilon slop to avoid failure from floating point jitter
--- End diff --

Ah right, I still had in mind that the check was using `ub-lb`, but it 
isn't. Below, in both proposed versions of the code, could you not also 
specially handle the case where ub-lb = 1?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18202866
  
--- Diff: python/pyspark/serializers.py ---
@@ -114,6 +114,9 @@ def __ne__(self, other):
 def __repr__(self):
 return %s object % self.__class__.__name__
 
+def __hash__(self):
+return hash(str(self))
--- End diff --

This is necessary, we need to check the serializers of dstreams.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2538#issuecomment-57278200
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21029/consoleFull)
 for   PR 2538 at commit 
[`e00136b`](https://github.com/apache/spark/commit/e00136b3dfd330689d89e44006a49871b36a4825).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18202884
  
--- Diff: python/pyspark/streaming/context.py ---
@@ -0,0 +1,243 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from py4j.java_collections import ListConverter
+from py4j.java_gateway import java_import
+
+from pyspark import RDD
+from pyspark.serializers import UTF8Deserializer
+from pyspark.context import SparkContext
+from pyspark.storagelevel import StorageLevel
+from pyspark.streaming.dstream import DStream
+from pyspark.streaming.util import RDDFunction
+
+__all__ = [StreamingContext]
+
+
+def _daemonize_callback_server():
+
+Hack Py4J to daemonize callback server
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18202894
  
--- Diff: python/pyspark/streaming/dstream.py ---
@@ -0,0 +1,633 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from itertools import chain, ifilter, imap
+import operator
+import time
+from datetime import datetime
+
+from pyspark import RDD
+from pyspark.storagelevel import StorageLevel
+from pyspark.streaming.util import rddToFileName, RDDFunction
+from pyspark.rdd import portable_hash
+from pyspark.resultiterable import ResultIterable
+
+__all__ = [DStream]
+
+
+class DStream(object):
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18202907
  
--- Diff: python/pyspark/streaming/dstream.py ---
@@ -0,0 +1,633 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from itertools import chain, ifilter, imap
+import operator
+import time
+from datetime import datetime
+
+from pyspark import RDD
+from pyspark.storagelevel import StorageLevel
+from pyspark.streaming.util import rddToFileName, RDDFunction
+from pyspark.rdd import portable_hash
+from pyspark.resultiterable import ResultIterable
+
+__all__ = [DStream]
+
+
+class DStream(object):
+def __init__(self, jdstream, ssc, jrdd_deserializer):
+self._jdstream = jdstream
+self._ssc = ssc
+self.ctx = ssc._sc
+self._jrdd_deserializer = jrdd_deserializer
+self.is_cached = False
+self.is_checkpointed = False
+
+def context(self):
+
+Return the StreamingContext associated with this DStream
+
+return self._ssc
+
+def count(self):
+
+Return a new DStream which contains the number of elements in this 
DStream.
+
+return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
+
+def sum(self):
+
+Add up the elements in this DStream.
+
+return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
+
+def filter(self, f):
+
+Return a new DStream containing only the elements that satisfy 
predicate.
+
+def func(iterator):
+return ifilter(f, iterator)
+return self.mapPartitions(func, True)
+
+def flatMap(self, f, preservesPartitioning=False):
+
+Pass each value in the key-value pair DStream through flatMap 
function
+without changing the keys: this also retains the original RDD's 
partition.
+
+def func(s, iterator):
+return chain.from_iterable(imap(f, iterator))
+return self.mapPartitionsWithIndex(func, preservesPartitioning)
+
+def map(self, f, preservesPartitioning=False):
+
+Return a new DStream by applying a function to each element of 
DStream.
+
+def func(iterator):
+return imap(f, iterator)
+return self.mapPartitions(func, preservesPartitioning)
+
+def mapPartitions(self, f, preservesPartitioning=False):
+
+Return a new DStream by applying a function to each partition of 
this DStream.
+
+def func(s, iterator):
+return f(iterator)
+return self.mapPartitionsWithIndex(func, preservesPartitioning)
+
+def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
+
+Return a new DStream by applying a function to each partition of 
this DStream,
+while tracking the index of the original partition.
+
+return self.transform(lambda rdd: rdd.mapPartitionsWithIndex(f, 
preservesPartitioning))
+
+def reduce(self, func):
+
+Return a new DStream by reduceing the elements of this RDD using 
the specified
+commutative and associative binary operator.
+
+return self.map(lambda x: (None, x)).reduceByKey(func, 
1).map(lambda x: x[1])
+
+def reduceByKey(self, func, numPartitions=None):
+
+Merge the value for each key using an associative reduce function.
+
+This will also perform the merging locally on each mapper before
+sending results to reducer, similarly to a combiner in MapReduce.
+
+Output will be hash-partitioned with C{numPartitions} partitions, 
or
+the default parallelism level if C{numPartitions} is not specified.
+
+return 

[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18203052
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.python
+
+import java.util.{ArrayList = JArrayList, List = JList}
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.api.java._
+import org.apache.spark.api.python._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Interval, Duration, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.api.java._
+
+
+/**
+ * Interface for Python callback function with three arguments
+ */
+trait PythonRDDFunction {
+  def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]]
+}
+
+/**
+ * Wrapper for PythonRDDFunction
+ */
+private[python] class RDDFunction(pfunc: PythonRDDFunction)
+  extends function.Function2[JList[JavaRDD[_]], Time, 
JavaRDD[Array[Byte]]] with Serializable {
+
+  def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = {
+if (rdd.isDefined) {
+  JavaRDD.fromRDD(rdd.get)
+} else {
+  null
+}
+  }
+
+  def some(jrdd: JavaRDD[Array[Byte]]): Option[RDD[Array[Byte]]] = {
+if (jrdd != null) {
+  Some(jrdd.rdd)
+} else {
+  None
+}
+  }
+
+  def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
+some(pfunc.call(time.milliseconds, List(wrapRDD(rdd)).asJava))
+  }
+
+  def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): 
Option[RDD[Array[Byte]]] = {
+some(pfunc.call(time.milliseconds, List(wrapRDD(rdd), 
wrapRDD(rdd2)).asJava))
+  }
+
+  // for JFunction2
+  def call(rdds: JList[JavaRDD[_]], time: Time): JavaRDD[Array[Byte]] = {
+pfunc.call(time.milliseconds, rdds)
+  }
+}
+
+private[python]
+abstract class PythonDStream(parent: DStream[_]) extends 
DStream[Array[Byte]] (parent.ssc) {
+
+  override def dependencies = List(parent)
+
+  override def slideDuration: Duration = parent.slideDuration
+
+  val asJavaDStream  = JavaDStream.fromDStream(this)
+}
+
+private[spark] object PythonDStream {
+
+  // helper function for DStream.foreachRDD(),
+  // cannot be `foreachRDD`, it will confusing py4j
+  def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pyfunc: 
PythonRDDFunction){
+val func = new RDDFunction(pyfunc)
+jdstream.dstream.foreachRDD((rdd, time) = func(Some(rdd), time))
+  }
+
+  // helper function for ssc.transform()
+  def callTransform(ssc: JavaStreamingContext, jdsteams: 
JList[JavaDStream[_]],
+pyfunc: PythonRDDFunction)
+:JavaDStream[Array[Byte]] = {
+val func = new RDDFunction(pyfunc)
+ssc.transform(jdsteams, func)
+  }
+
+  // convert list of RDD into queue of RDDs, for ssc.queueStream()
+  def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]]): 
java.util.Queue[JavaRDD[Array[Byte]]] = {
+val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]]
+rdds.forall(queue.add(_))
+queue
+  }
+}
+
+/**
+ * Transformed DStream in Python.
+ *
+ * If the result RDD is PythonRDD, then it will cache it as an template 
for future use,
+ * this can reduce the Python callbacks.
+ */
+private[spark]
+class PythonTransformedDStream (parent: DStream[_], pfunc: 
PythonRDDFunction,
+var reuse: Boolean = false)
+  extends PythonDStream(parent) {
+
+  val func = new RDDFunction(pfunc)
+  var lastResult: PythonRDD = _
+
+  override def compute(validTime: Time): Option[RDD[Array[Byte]]] 

[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/2538#discussion_r18203252
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.python
+
+import java.util.{ArrayList = JArrayList, List = JList}
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.api.java._
+import org.apache.spark.api.python._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Interval, Duration, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.api.java._
+
+
+/**
+ * Interface for Python callback function with three arguments
+ */
+trait PythonRDDFunction {
+  def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]]
+}
+
+/**
+ * Wrapper for PythonRDDFunction
+ */
+private[python] class RDDFunction(pfunc: PythonRDDFunction)
--- End diff --

I had tried implicit conversion, but failed. In the case of func(rdd, rdd2, 
time), scala can not create an RDDFunction for it automatically.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/2538#issuecomment-57279309
  
@giwa After fixing the problem of increasing partitions (it will increase 
performance problem), the tests run very stable now. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/2538#issuecomment-57279408
  
@tdas I should have addressed all your comments (or leave comment), please 
take another look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2538#issuecomment-57279516
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21031/consoleFull)
 for   PR 2538 at commit 
[`b98d63f`](https://github.com/apache/spark/commit/b98d63fbde10f20a42e1e6e0f34f45736b802772).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2330#issuecomment-57279534
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21032/consoleFull)
 for   PR 2330 at commit 
[`ad09236`](https://github.com/apache/spark/commit/ad092361f649b82dff64c44a30b50af1e90c).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2538#issuecomment-57279780
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21030/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3713][SQL] Uses JSON to serialize DataT...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2563#issuecomment-57279975
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/210/consoleFull)
 for   PR 2563 at commit 
[`03da3ec`](https://github.com/apache/spark/commit/03da3ec870940bd6ff56e03450993da6125b40a4).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3652] [SQL] upgrade spark sql hive vers...

2014-09-30 Thread liancheng
Github user liancheng commented on the pull request:

https://github.com/apache/spark/pull/2499#issuecomment-57280380
  
@scwf A shim layer seems reasonable if we can make clean abstractions. A 
major issue is that the original `HiveServer`/`HiveServer2` not designed to be 
extended by other applications, that's why we have to use reflection tricks to 
implement `HiveThriftServer2` and `SparkSQLCLIDriver`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3689] FileLogger should create new inst...

2014-09-30 Thread sarutak
Github user sarutak closed the pull request at:

https://github.com/apache/spark/pull/2534


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3689] FileLogger should create new inst...

2014-09-30 Thread sarutak
Github user sarutak commented on the pull request:

https://github.com/apache/spark/pull/2534#issuecomment-57280754
  
I noticed it's needed only for HDFS, so I close this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2432#issuecomment-57280938
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21027/consoleFull)
 for   PR 2432 at commit 
[`42bea55`](https://github.com/apache/spark/commit/42bea55e3a4b665f8b3a6f9a423a2a199ea55093).
 * This patch **fails** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `public class ApplicationId implements Serializable `
  * `  case class KillExecutor(`
  * `  case class MasterChangeAcknowledged(appId: ApplicationId)`
  * `  case class RegisteredApplication(appId: ApplicationId, masterUrl: 
String) extends DeployMessage`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...

2014-09-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2432#issuecomment-57280945
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21027/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...

2014-09-30 Thread sarutak
Github user sarutak commented on the pull request:

https://github.com/apache/spark/pull/2432#issuecomment-57281035
  
retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [WIP][SPARK-3247][SQL] An API for adding forei...

2014-09-30 Thread liancheng
Github user liancheng commented on the pull request:

https://github.com/apache/spark/pull/2475#issuecomment-57281203
  
Considering from a user's perspective, mixing all kinds of data sources is 
cool, but correspond to `TableScan`, we also need a `TableInsert` trait to 
close the loop. Usually a foreign data source should implement `TableScan`, and 
optionally implement `TableInsert`, which should be generally easier to 
implement than the former.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2432#issuecomment-57281304
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21033/consoleFull)
 for   PR 2432 at commit 
[`42bea55`](https://github.com/apache/spark/commit/42bea55e3a4b665f8b3a6f9a423a2a199ea55093).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3677] [BUILD] [YARN] Scalastyle is neve...

2014-09-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2520#issuecomment-57281411
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21028/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3677] [BUILD] [YARN] Scalastyle is neve...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2520#issuecomment-57281406
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21028/consoleFull)
 for   PR 2520 at commit 
[`641b9e7`](https://github.com/apache/spark/commit/641b9e704d72b4111b1fb8662c523e31559980d8).
 * This patch **fails** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread giwa
Github user giwa commented on the pull request:

https://github.com/apache/spark/pull/2538#issuecomment-57281449
  
@davies That's great!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3677] [BUILD] [YARN] Scalastyle is neve...

2014-09-30 Thread sarutak
Github user sarutak commented on the pull request:

https://github.com/apache/spark/pull/2520#issuecomment-57281529
  
retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1830 Deploy failover, Make Persistence e...

2014-09-30 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/771#discussion_r18204443
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
 ---
@@ -79,11 +80,9 @@ private[spark] class FileSystemPersistenceEngine(
 val created = file.createNewFile()
 if (!created) { throw new IllegalStateException(Could not create 
file:  + file) }
 
-val serializer = serialization.findSerializerFor(value)
-val serialized = serializer.toBinary(value)
-
+val serialized = serializer.serialize(value)
 val out = new FileOutputStream(file)
-out.write(serialized)
+out.write(serialized.array())
--- End diff --

I did not understand, what you meant here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3677] [BUILD] [YARN] Scalastyle is neve...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2520#issuecomment-57282193
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21034/consoleFull)
 for   PR 2520 at commit 
[`641b9e7`](https://github.com/apache/spark/commit/641b9e704d72b4111b1fb8662c523e31559980d8).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3741] Make ConnectionManager propagate ...

2014-09-30 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/2593

[SPARK-3741] Make ConnectionManager propagate errors properly and add mo...

...re logs to avoid Executors swallowing errors

This PR made the following changes:
* Register a callback to `Connection` so that the error will be propagated 
properly.
* Add more logs so that the errors won't be swallowed by Executors.
* Use trySuccess/tryFailure because `Promise` doesn't allow to call 
success/failure more than once.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-3741

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2593.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2593


commit 764aec5f256babf5baff20cb0c0cf5532c2a5f20
Author: zsxwing zsxw...@gmail.com
Date:   2014-09-30T07:04:16Z

[SPARK-3741] Make ConnectionManager propagate errors properly and add more 
logs to avoid Executors swallowing errors




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3741] Make ConnectionManager propagate ...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2593#issuecomment-57283216
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21035/consoleFull)
 for   PR 2593 at commit 
[`764aec5`](https://github.com/apache/spark/commit/764aec5f256babf5baff20cb0c0cf5532c2a5f20).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2538#issuecomment-57284243
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21029/consoleFull)
 for   PR 2538 at commit 
[`e00136b`](https://github.com/apache/spark/commit/e00136b3dfd330689d89e44006a49871b36a4825).
 * This patch **fails** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class StreamingContext(object):`
  * `class DStream(object):`
  * `class TransformedDStream(DStream):`
  * `class RDDFunction(object):`
  * `abstract class PythonDStream(parent: DStream[_], pfunc: 
PythonRDDFunction)`
  * `class PythonTransformedDStream (parent: DStream[_], pfunc: 
PythonRDDFunction,`
  * `class PythonTransformed2DStream(parent: DStream[_], parent2: 
DStream[_],`
  * `class PythonStateDStream(parent: DStream[Array[Byte]], reduceFunc: 
PythonRDDFunction)`
  * `class PythonReducedWindowedDStream(parent: DStream[Array[Byte]],`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2538#issuecomment-57284254
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21029/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2591#issuecomment-57284538
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/212/consoleFull)
 for   PR 2591 at commit 
[`ab99cc0`](https://github.com/apache/spark/commit/ab99cc0b3cbaa3f83c0feb40436cdf389905b658).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3718] FsHistoryProvider should consider...

2014-09-30 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the pull request:

https://github.com/apache/spark/pull/2573#issuecomment-57285525
  
Actually HistoryServer can read application logs generated by Spark apps on 
another node. The `spark.eventLog.dir` could be different between this and 
that. So on my opinion it is flexible to seperate the two configs.
Also `spark.eventLog.dir` is activated only if `spark.eventLog.enabled` is 
true. If HistoryServer load data in `spark.eventLog.dir`, is it necessary to 
check value of `spark.eventLog.enabled`? 
In a word current solution is simple and loose coupling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...

2014-09-30 Thread nchammas
Github user nchammas commented on the pull request:

https://github.com/apache/spark/pull/2588#issuecomment-57285986
  
 @nchammas - would you be interested in submitting a pr to change the qa 
script so that the timeout and failure message already prints the commit hash?

@rxin Straight failures should already include the commit hash, [like 
here](https://github.com/apache/spark/pull/2588#issuecomment-57272186). (Note 
that messages like [this 
one](https://github.com/apache/spark/pull/2588#issuecomment-57272188) do not 
come from our script.)

I can make a PR to add the commit hash to the timeout messages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2377] Python API for Streaming

2014-09-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2538#issuecomment-57286041
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21031/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   >