[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18200173 --- Diff: bin/pyspark --- @@ -87,11 +87,7 @@ export PYSPARK_SUBMIT_ARGS if [[ -n $SPARK_TESTING ]]; then unset YARN_CONF_DIR unset HADOOP_CONF_DIR - if [[ -n $PYSPARK_DOC_TEST ]]; then -exec $PYSPARK_PYTHON -m doctest $1 - else -exec $PYSPARK_PYTHON $1 - fi + exec $PYSPARK_PYTHON $1 --- End diff -- If that is easy to do, then that is a better idea. Since this PR is already so big, lets change as little of the existing infrastructure as possible. Otherwise if existing pyspark breaks in some weird way, it will be hard to revert this commit, without reverting all of pysparkstreaming. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18200181 --- Diff: python/pyspark/accumulators.py --- @@ -256,3 +256,8 @@ def _start_update_server(): thread.daemon = True thread.start() return server + + +if __name__ == __main__: --- End diff -- Same logic as I mentioned earlier. If it is easy undo the refactoring and be able to run tests, lets try to do that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18200194 --- Diff: python/pyspark/serializers.py --- @@ -114,6 +114,9 @@ def __ne__(self, other): def __repr__(self): return %s object % self.__class__.__name__ +def __hash__(self): +return hash(str(self)) --- End diff -- Similar question: are the changes in this file necessary for streaming or was part of the refactoring? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18200248 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +791,110 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { +peerFetchLock.synchronized { + val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 * 1000) // milliseconds --- End diff -- I didnt want to pollute the namespace inside the BlockManager class any more than absolutely necessary. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18200286 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -111,6 +112,9 @@ private[spark] class BlockManager( MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf) private val broadcastCleaner = new MetadataCleaner( MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf) + @volatile private var cachedPeers: Seq[BlockManagerId] = _ --- End diff -- Good idea. Should have done that myself. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/2590#discussion_r18200306 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSqlParser.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.language.implicitConversions +import scala.util.parsing.combinator.syntactical.StandardTokenParsers +import scala.util.parsing.combinator.PackratParsers +import scala.util.parsing.input.CharArrayReader.EofCh +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.SqlLexical +import scala.util.parsing.combinator.lexical.StdLexical + +/** + * A simple Hive SQL pre parser. It parses the commands like cache,uncache etc and + * remaining actual query will be parsed by HiveQl.parseSql + */ --- End diff -- Since this class takes over all HiveQL parsing work (although it delegates to an underlying Hive parser), it's not accurate to call it a pre parser, maybe this: A parser that recognizes all HiveQL constructs together with several Spark SQL specific extensions like `CACHE TABLE` and `UNCACHE TABLE`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3412][SQL]add missing row api
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2529#issuecomment-57270939 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21023/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/2590#discussion_r18200319 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSqlParser.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.language.implicitConversions +import scala.util.parsing.combinator.syntactical.StandardTokenParsers +import scala.util.parsing.combinator.PackratParsers +import scala.util.parsing.input.CharArrayReader.EofCh +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.SqlLexical +import scala.util.parsing.combinator.lexical.StdLexical + +/** + * A simple Hive SQL pre parser. It parses the commands like cache,uncache etc and + * remaining actual query will be parsed by HiveQl.parseSql + */ +class HiveSqlParser extends StandardTokenParsers with PackratParsers { --- End diff -- Maybe `ExtendedHiveQLParser` is a better name? Since HiveQL is the definitive name and we're adding extensions to it in Spark SQL. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18200328 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +791,110 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { +peerFetchLock.synchronized { + val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 * 1000) // milliseconds + val timeout = System.currentTimeMillis - lastPeerFetchTime cachedPeersTtl + if (cachedPeers == null || forceFetch || timeout) { +cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug(Fetched peers from master: + cachedPeers.mkString([, ,, ])) + } + cachedPeers +} + } + + /** * Replicate block to another node. --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/2590#discussion_r18200330 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSqlParser.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.language.implicitConversions +import scala.util.parsing.combinator.syntactical.StandardTokenParsers +import scala.util.parsing.combinator.PackratParsers +import scala.util.parsing.input.CharArrayReader.EofCh +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.SqlLexical +import scala.util.parsing.combinator.lexical.StdLexical + +/** + * A simple Hive SQL pre parser. It parses the commands like cache,uncache etc and + * remaining actual query will be parsed by HiveQl.parseSql + */ +class HiveSqlParser extends StandardTokenParsers with PackratParsers { + + def apply(input: String): LogicalPlan = { +// Special-case out set commands since the value fields can be +// complex to handle without RegexParsers. Also this approach +// is clearer for the several possible cases of set commands. +if (input.trim.toLowerCase.startsWith(set)) { --- End diff -- I'd like to handle `SET` in parser combinator too, but we can leave this to another PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/2590#discussion_r18200397 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala --- @@ -75,6 +75,9 @@ class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) { */ class HiveContext(sc: SparkContext) extends SQLContext(sc) { self = + + @transient + protected[sql] val hiveParser = new HiveSqlParser --- End diff -- Can we move this to `HiveQl`? It seems more reasonable to put all HiveQL parsing logic together. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/2588#issuecomment-57271331 Isnt there a way to augment the existing tests to make sure that the state in the driver (blockmanagermaster) is cleared after removing tests? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3412][SQL]add missing row api
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2529#issuecomment-57271368 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/208/consoleFull) for PR 2529 at commit [`4c18c29`](https://github.com/apache/spark/commit/4c18c29faedd52ca6a3d925ea039841b860862f7). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/2590#discussion_r18200443 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSqlParser.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.language.implicitConversions +import scala.util.parsing.combinator.syntactical.StandardTokenParsers +import scala.util.parsing.combinator.PackratParsers +import scala.util.parsing.input.CharArrayReader.EofCh +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.SqlLexical +import scala.util.parsing.combinator.lexical.StdLexical + +/** + * A simple Hive SQL pre parser. It parses the commands like cache,uncache etc and + * remaining actual query will be parsed by HiveQl.parseSql + */ +class HiveSqlParser extends StandardTokenParsers with PackratParsers { + + def apply(input: String): LogicalPlan = { +// Special-case out set commands since the value fields can be +// complex to handle without RegexParsers. Also this approach +// is clearer for the several possible cases of set commands. +if (input.trim.toLowerCase.startsWith(set)) { + input.trim.drop(3).split(=, 2).map(_.trim) match { +case Array() = // set + SetCommand(None, None) +case Array(key) = // set key + SetCommand(Some(key), None) +case Array(key, value) = // set key=value + SetCommand(Some(key), Some(value)) + } +} else if (input.trim.startsWith(!)) { + ShellCommand(input.drop(1)) +} else { + phrase(query)(new lexical.Scanner(input)) match { +case Success(r, x) = r +case x = sys.error(x.toString) + } +} + } + + protected case class Keyword(str: String) + + protected val CACHE = Keyword(CACHE) + protected val SET = Keyword(SET) + protected val ADD = Keyword(ADD) + protected val JAR = Keyword(JAR) + protected val TABLE = Keyword(TABLE) + protected val AS = Keyword(AS) + protected val UNCACHE = Keyword(UNCACHE) + protected val FILE = Keyword(FILE) + protected val DFS = Keyword(DFS) + protected val SOURCE = Keyword(SOURCE) + + protected implicit def asParser(k: Keyword): Parser[String] = +lexical.allCaseVersions(k.str).map(x = x : Parser[String]).reduce(_ | _) + + protected def allCaseConverse(k: String): Parser[String] = +lexical.allCaseVersions(k).map(x = x : Parser[String]).reduce(_ | _) + + protected val reservedWords = +this.getClass + .getMethods + .filter(_.getReturnType == classOf[Keyword]) + .map(_.invoke(this).asInstanceOf[Keyword].str) + + override val lexical = new SqlLexical(reservedWords) + + protected lazy val query: Parser[LogicalPlan] = ( +cache | unCache | addJar | addFile | dfs | source | hiveQl --- End diff -- Nit: I'd prefer `uncache` to `unCache`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/2588#issuecomment-57271603 Yes - can you submit one? I'm going to merge this because it has been blocking a lot of other patches. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/2588 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1830 Deploy failover, Make Persistence e...
Github user ScrapCodes commented on the pull request: https://github.com/apache/spark/pull/771#issuecomment-57271768 Because they were private spark. It is very inconvenient for someone to write his/her own recovery mode with all that private spark. + This felt like developer facing API so putting those annotations seemed appropriate. What is you opinion on this ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3166]: Allow custom serialiser to be sh...
Github user ypwais commented on the pull request: https://github.com/apache/spark/pull/1890#issuecomment-57271841 Any chance this might make it into v1.2? I'd love to use custom {Input,Output}Formats (e.g. Parquet) and I personally spent almost a day after getting bitten by this classloader issue (especially since it doesn't manifest in local mode). And fixing serialization is nice too ^_^ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/2590#discussion_r18200542 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSqlParser.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.language.implicitConversions +import scala.util.parsing.combinator.syntactical.StandardTokenParsers +import scala.util.parsing.combinator.PackratParsers +import scala.util.parsing.input.CharArrayReader.EofCh +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.SqlLexical +import scala.util.parsing.combinator.lexical.StdLexical + +/** + * A simple Hive SQL pre parser. It parses the commands like cache,uncache etc and + * remaining actual query will be parsed by HiveQl.parseSql + */ +class HiveSqlParser extends StandardTokenParsers with PackratParsers { + + def apply(input: String): LogicalPlan = { +// Special-case out set commands since the value fields can be +// complex to handle without RegexParsers. Also this approach +// is clearer for the several possible cases of set commands. +if (input.trim.toLowerCase.startsWith(set)) { + input.trim.drop(3).split(=, 2).map(_.trim) match { +case Array() = // set + SetCommand(None, None) +case Array(key) = // set key + SetCommand(Some(key), None) +case Array(key, value) = // set key=value + SetCommand(Some(key), Some(value)) + } +} else if (input.trim.startsWith(!)) { + ShellCommand(input.drop(1)) +} else { + phrase(query)(new lexical.Scanner(input)) match { +case Success(r, x) = r +case x = sys.error(x.toString) + } +} + } + + protected case class Keyword(str: String) + + protected val CACHE = Keyword(CACHE) + protected val SET = Keyword(SET) + protected val ADD = Keyword(ADD) + protected val JAR = Keyword(JAR) + protected val TABLE = Keyword(TABLE) + protected val AS = Keyword(AS) + protected val UNCACHE = Keyword(UNCACHE) + protected val FILE = Keyword(FILE) + protected val DFS = Keyword(DFS) + protected val SOURCE = Keyword(SOURCE) + + protected implicit def asParser(k: Keyword): Parser[String] = +lexical.allCaseVersions(k.str).map(x = x : Parser[String]).reduce(_ | _) + + protected def allCaseConverse(k: String): Parser[String] = +lexical.allCaseVersions(k).map(x = x : Parser[String]).reduce(_ | _) + + protected val reservedWords = +this.getClass + .getMethods + .filter(_.getReturnType == classOf[Keyword]) + .map(_.invoke(this).asInstanceOf[Keyword].str) + + override val lexical = new SqlLexical(reservedWords) + + protected lazy val query: Parser[LogicalPlan] = ( +cache | unCache | addJar | addFile | dfs | source | hiveQl + ) + + protected lazy val hiveQl: Parser[LogicalPlan] = +remainingQuery ^^ { + case r = HiveQl.parseSql(r.trim()) +} + + /** It returns all remaining query */ + protected lazy val remainingQuery: Parser[String] = new Parser[String] { +def apply(in:Input) = Success(in.source.subSequence(in.offset, in.source.length).toString, --- End diff -- Nit: space after `:` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/2588#issuecomment-57271851 Actually, I took a look, it does test that. So I am not sure how it was passing earlier some of the times. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...
GitHub user rxin opened a pull request: https://github.com/apache/spark/pull/2591 [SPARK-3709] Executors don't always report broadcast block removal properly back to the driver (for branch-1.1) You can merge this pull request into a Git repository by running: $ git pull https://github.com/rxin/spark SPARK-3709-1.1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2591.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2591 commit ab99cc0b3cbaa3f83c0feb40436cdf389905b658 Author: Reynold Xin r...@apache.org Date: 2014-09-30T06:20:39Z [SPARK-3709] Executors don't always report broadcast block removal properly back to the driver --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/2591#issuecomment-57271920 We should also cherrypick this into branch-1.0. Master branch has been fixed in https://github.com/apache/spark/pull/2588 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/2588#issuecomment-57272107 It worked because askSlaves was true and the driver always queries the slaves in your afterUnpersist test. The problem is with regard to reporting, not whether the block itself has been dropped or not. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2588#issuecomment-57272188 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21022/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2588#issuecomment-57272186 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21022/consoleFull) for PR 2588 at commit [`6dab2e3`](https://github.com/apache/spark/commit/6dab2e30b0f2f9e1cb38f4b0ed93fc57d65a978c). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2590#issuecomment-57272266 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/209/consoleFull) for PR 2590 at commit [`ba26cd1`](https://github.com/apache/spark/commit/ba26cd1a5895fa71a028992b5c946bd8333dfdc3). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...
Github user liancheng commented on the pull request: https://github.com/apache/spark/pull/2590#issuecomment-57272298 Awesome! I just started working on this last weekend and you've already got done :) Left some minor comments. This generally LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2432#issuecomment-57272310 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21020/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2432#issuecomment-57272306 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21020/consoleFull) for PR 2432 at commit [`f6af132`](https://github.com/apache/spark/commit/f6af132cbfba32d2e7434f6b027f1cd6188ea6f2). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `public class ApplicationId implements Serializable ` * ` case class KillExecutor(` * ` case class MasterChangeAcknowledged(appId: ApplicationId)` * ` case class RegisteredApplication(appId: ApplicationId, masterUrl: String) extends DeployMessage` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2591#issuecomment-57272501 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21024/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3713][SQL] Uses JSON to serialize DataT...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2563#issuecomment-57272666 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/210/consoleFull) for PR 2563 at commit [`03da3ec`](https://github.com/apache/spark/commit/03da3ec870940bd6ff56e03450993da6125b40a4). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2330#issuecomment-57272680 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21025/consoleFull) for PR 2330 at commit [`0dae310`](https://github.com/apache/spark/commit/0dae31022fa26abc806db94e02fa7c15a031d1c1). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18200872 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.python + +import java.util.{ArrayList = JArrayList, List = JList} +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.api.java._ +import org.apache.spark.api.python._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Interval, Duration, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.api.java._ + + +/** + * Interface for Python callback function with three arguments + */ +trait PythonRDDFunction { + def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]] +} + +/** + * Wrapper for PythonRDDFunction + */ +private[python] class RDDFunction(pfunc: PythonRDDFunction) --- End diff -- I think I get it. You need a simple trait / interface (`PythonRDDFunction`) that is used as the interface for creating python function objects through py4j. And then you need to convert it to a `Function2` object (using `RDDFunction` class) so that it can be passed on to `DStream.foreachRDD`. However, I am not sure this warrants a separate class with a confusing name. How about static conversions added to `object PythonRDDFunction` like ``` object PythonRDDFunction { implicit def toFunction2(): Option[Function2[]] = { } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3734] DriverRunner should not read SPAR...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/2586#issuecomment-57272762 LGTM. Merging into master and 1.1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3412][SQL]add missing row api
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2529#issuecomment-57272803 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/207/consoleFull) for PR 2529 at commit [`4c18c29`](https://github.com/apache/spark/commit/4c18c29faedd52ca6a3d925ea039841b860862f7). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3734] DriverRunner should not read SPAR...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/2586 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18200937 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.python + +import java.util.{ArrayList = JArrayList, List = JList} +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.api.java._ +import org.apache.spark.api.python._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Interval, Duration, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.api.java._ + + +/** + * Interface for Python callback function with three arguments + */ +trait PythonRDDFunction { --- End diff -- The name is very non-intuitive. This is just an interface for defining functions that can be used in `Dstream.transform`. A more intuitive name (at least to me) is `DStreamTransformFunction`. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2432#discussion_r18200976 --- Diff: core/src/main/java/org/apache/spark/ApplicationId.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; + +/** + * This class represents unique application id for identifying each application + */ +public class ApplicationId implements Serializable { + + private String appId; + + public ApplicationId(String appId) { +this.appId = appId; + } + + @Override + public boolean equals(Object other) { +if (!(other instanceof ApplicationId)) { + return false; +} else if (other == this) { + return true; +} else if (appId != null) { + return appId.equals(((ApplicationId)other).appId); +} else { + return false; +} + } + + @Override + public int hashCode() { +return appId.hashCode(); + } + + @Override + public String toString() { +return appId; + } + + + private static ApplicationId defaultAppId = +new ApplicationId(String.valueOf(System.currentTimeMillis())); --- End diff -- O.K. I've modified that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18201011 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.python + +import java.util.{ArrayList = JArrayList, List = JList} +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.api.java._ +import org.apache.spark.api.python._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Interval, Duration, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.api.java._ + + +/** + * Interface for Python callback function with three arguments + */ +trait PythonRDDFunction { + def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]] +} + +/** + * Wrapper for PythonRDDFunction + */ +private[python] class RDDFunction(pfunc: PythonRDDFunction) + extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable { + + def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = { --- End diff -- Since they are not used outside `RDDFunction`, they should at least be private to this class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18201085 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.python + +import java.util.{ArrayList = JArrayList, List = JList} +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.api.java._ +import org.apache.spark.api.python._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Interval, Duration, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.api.java._ + + +/** + * Interface for Python callback function with three arguments + */ +trait PythonRDDFunction { + def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]] +} + +/** + * Wrapper for PythonRDDFunction + */ +private[python] class RDDFunction(pfunc: PythonRDDFunction) + extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable { + + def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = { +if (rdd.isDefined) { + JavaRDD.fromRDD(rdd.get) +} else { + null +} + } + + def some(jrdd: JavaRDD[Array[Byte]]): Option[RDD[Array[Byte]]] = { +if (jrdd != null) { + Some(jrdd.rdd) +} else { + None +} + } + + def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { +some(pfunc.call(time.milliseconds, List(wrapRDD(rdd)).asJava)) + } + + def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { +some(pfunc.call(time.milliseconds, List(wrapRDD(rdd), wrapRDD(rdd2)).asJava)) + } + + // for JFunction2 + def call(rdds: JList[JavaRDD[_]], time: Time): JavaRDD[Array[Byte]] = { +pfunc.call(time.milliseconds, rdds) + } +} + +private[python] +abstract class PythonDStream(parent: DStream[_]) extends DStream[Array[Byte]] (parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + val asJavaDStream = JavaDStream.fromDStream(this) +} + +private[spark] object PythonDStream { --- End diff -- Why `private[spark]` and not `private[python]` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2706][SQL] Enable Spark to support Hive...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/2241#issuecomment-57273157 Hey @zhzhan I've published a modified version of Hive 0.13 that we can link against. A few benefits is: 1. I fixed the hive-exec jar so it only contains hive packages and not a bunch of other code. 2. I'm using a shaded version of the protobuf dependency (otherwise, this intefers with protobuf found in older hadoop client versions). https://oss.sonatype.org/content/repositories/orgspark-project-1077/org/spark-project/hive/ For now you'll need to add this repo to the build to use it, but if it all works I can just fully publish this to maven central. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18201096 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.python + +import java.util.{ArrayList = JArrayList, List = JList} +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.api.java._ +import org.apache.spark.api.python._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Interval, Duration, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.api.java._ + + +/** + * Interface for Python callback function with three arguments + */ +trait PythonRDDFunction { + def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]] +} + +/** + * Wrapper for PythonRDDFunction + */ +private[python] class RDDFunction(pfunc: PythonRDDFunction) + extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable { + + def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = { +if (rdd.isDefined) { + JavaRDD.fromRDD(rdd.get) +} else { + null +} + } + + def some(jrdd: JavaRDD[Array[Byte]]): Option[RDD[Array[Byte]]] = { +if (jrdd != null) { + Some(jrdd.rdd) +} else { + None +} + } + + def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { +some(pfunc.call(time.milliseconds, List(wrapRDD(rdd)).asJava)) + } + + def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { +some(pfunc.call(time.milliseconds, List(wrapRDD(rdd), wrapRDD(rdd2)).asJava)) + } + + // for JFunction2 + def call(rdds: JList[JavaRDD[_]], time: Time): JavaRDD[Array[Byte]] = { +pfunc.call(time.milliseconds, rdds) + } +} + +private[python] +abstract class PythonDStream(parent: DStream[_]) extends DStream[Array[Byte]] (parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + val asJavaDStream = JavaDStream.fromDStream(this) +} + +private[spark] object PythonDStream { + + // helper function for DStream.foreachRDD(), + // cannot be `foreachRDD`, it will confusing py4j + def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pyfunc: PythonRDDFunction){ +val func = new RDDFunction(pyfunc) +jdstream.dstream.foreachRDD((rdd, time) = func(Some(rdd), time)) + } + + // helper function for ssc.transform() + def callTransform(ssc: JavaStreamingContext, jdsteams: JList[JavaDStream[_]], +pyfunc: PythonRDDFunction) +:JavaDStream[Array[Byte]] = { +val func = new RDDFunction(pyfunc) +ssc.transform(jdsteams, func) + } + + // convert list of RDD into queue of RDDs, for ssc.queueStream() + def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]]): java.util.Queue[JavaRDD[Array[Byte]]] = { +val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]] +rdds.forall(queue.add(_)) +queue + } +} + +/** + * Transformed DStream in Python. + * + * If the result RDD is PythonRDD, then it will cache it as an template for future use, + * this can reduce the Python callbacks. + */ +private[spark] --- End diff -- Why `private[spark]` and not `private[python]`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r1820 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.python + +import java.util.{ArrayList = JArrayList, List = JList} +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.api.java._ +import org.apache.spark.api.python._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Interval, Duration, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.api.java._ + + +/** + * Interface for Python callback function with three arguments + */ +trait PythonRDDFunction { + def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]] +} + +/** + * Wrapper for PythonRDDFunction + */ +private[python] class RDDFunction(pfunc: PythonRDDFunction) + extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable { + + def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = { +if (rdd.isDefined) { + JavaRDD.fromRDD(rdd.get) +} else { + null +} + } + + def some(jrdd: JavaRDD[Array[Byte]]): Option[RDD[Array[Byte]]] = { +if (jrdd != null) { + Some(jrdd.rdd) +} else { + None +} + } + + def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { +some(pfunc.call(time.milliseconds, List(wrapRDD(rdd)).asJava)) + } + + def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { +some(pfunc.call(time.milliseconds, List(wrapRDD(rdd), wrapRDD(rdd2)).asJava)) + } + + // for JFunction2 + def call(rdds: JList[JavaRDD[_]], time: Time): JavaRDD[Array[Byte]] = { +pfunc.call(time.milliseconds, rdds) + } +} + +private[python] +abstract class PythonDStream(parent: DStream[_]) extends DStream[Array[Byte]] (parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + val asJavaDStream = JavaDStream.fromDStream(this) +} + +private[spark] object PythonDStream { + + // helper function for DStream.foreachRDD(), + // cannot be `foreachRDD`, it will confusing py4j + def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pyfunc: PythonRDDFunction){ +val func = new RDDFunction(pyfunc) +jdstream.dstream.foreachRDD((rdd, time) = func(Some(rdd), time)) + } + + // helper function for ssc.transform() + def callTransform(ssc: JavaStreamingContext, jdsteams: JList[JavaDStream[_]], +pyfunc: PythonRDDFunction) +:JavaDStream[Array[Byte]] = { +val func = new RDDFunction(pyfunc) +ssc.transform(jdsteams, func) + } + + // convert list of RDD into queue of RDDs, for ssc.queueStream() + def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]]): java.util.Queue[JavaRDD[Array[Byte]]] = { +val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]] +rdds.forall(queue.add(_)) +queue + } +} + +/** + * Transformed DStream in Python. + * + * If the result RDD is PythonRDD, then it will cache it as an template for future use, + * this can reduce the Python callbacks. + */ +private[spark] +class PythonTransformedDStream (parent: DStream[_], pfunc: PythonRDDFunction, +var reuse: Boolean = false) + extends PythonDStream(parent) { + + val func = new RDDFunction(pfunc) + var lastResult: PythonRDD = _ + + override def compute(validTime: Time): Option[RDD[Array[Byte]]] =
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18201102 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.python + +import java.util.{ArrayList = JArrayList, List = JList} +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.api.java._ +import org.apache.spark.api.python._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Interval, Duration, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.api.java._ + + +/** + * Interface for Python callback function with three arguments + */ +trait PythonRDDFunction { + def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]] +} + +/** + * Wrapper for PythonRDDFunction + */ +private[python] class RDDFunction(pfunc: PythonRDDFunction) + extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable { + + def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = { +if (rdd.isDefined) { + JavaRDD.fromRDD(rdd.get) +} else { + null +} + } + + def some(jrdd: JavaRDD[Array[Byte]]): Option[RDD[Array[Byte]]] = { +if (jrdd != null) { + Some(jrdd.rdd) +} else { + None +} + } + + def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { +some(pfunc.call(time.milliseconds, List(wrapRDD(rdd)).asJava)) + } + + def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { +some(pfunc.call(time.milliseconds, List(wrapRDD(rdd), wrapRDD(rdd2)).asJava)) + } + + // for JFunction2 + def call(rdds: JList[JavaRDD[_]], time: Time): JavaRDD[Array[Byte]] = { +pfunc.call(time.milliseconds, rdds) + } +} + +private[python] +abstract class PythonDStream(parent: DStream[_]) extends DStream[Array[Byte]] (parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + val asJavaDStream = JavaDStream.fromDStream(this) +} + +private[spark] object PythonDStream { + + // helper function for DStream.foreachRDD(), + // cannot be `foreachRDD`, it will confusing py4j + def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pyfunc: PythonRDDFunction){ +val func = new RDDFunction(pyfunc) +jdstream.dstream.foreachRDD((rdd, time) = func(Some(rdd), time)) + } + + // helper function for ssc.transform() + def callTransform(ssc: JavaStreamingContext, jdsteams: JList[JavaDStream[_]], +pyfunc: PythonRDDFunction) +:JavaDStream[Array[Byte]] = { +val func = new RDDFunction(pyfunc) +ssc.transform(jdsteams, func) + } + + // convert list of RDD into queue of RDDs, for ssc.queueStream() + def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]]): java.util.Queue[JavaRDD[Array[Byte]]] = { +val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]] +rdds.forall(queue.add(_)) +queue + } +} + +/** + * Transformed DStream in Python. + * + * If the result RDD is PythonRDD, then it will cache it as an template for future use, + * this can reduce the Python callbacks. + */ +private[spark] +class PythonTransformedDStream (parent: DStream[_], pfunc: PythonRDDFunction, +var reuse: Boolean = false) + extends PythonDStream(parent) { + + val func = new RDDFunction(pfunc) + var lastResult: PythonRDD = _ + + override def compute(validTime: Time): Option[RDD[Array[Byte]]] =
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18201113 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.python + +import java.util.{ArrayList = JArrayList, List = JList} +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.api.java._ +import org.apache.spark.api.python._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Interval, Duration, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.api.java._ + + +/** + * Interface for Python callback function with three arguments + */ +trait PythonRDDFunction { + def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]] +} + +/** + * Wrapper for PythonRDDFunction + */ +private[python] class RDDFunction(pfunc: PythonRDDFunction) + extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable { + + def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = { +if (rdd.isDefined) { + JavaRDD.fromRDD(rdd.get) +} else { + null +} + } + + def some(jrdd: JavaRDD[Array[Byte]]): Option[RDD[Array[Byte]]] = { +if (jrdd != null) { + Some(jrdd.rdd) +} else { + None +} + } + + def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { +some(pfunc.call(time.milliseconds, List(wrapRDD(rdd)).asJava)) + } + + def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { +some(pfunc.call(time.milliseconds, List(wrapRDD(rdd), wrapRDD(rdd2)).asJava)) + } + + // for JFunction2 + def call(rdds: JList[JavaRDD[_]], time: Time): JavaRDD[Array[Byte]] = { +pfunc.call(time.milliseconds, rdds) + } +} + +private[python] +abstract class PythonDStream(parent: DStream[_]) extends DStream[Array[Byte]] (parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + val asJavaDStream = JavaDStream.fromDStream(this) +} + +private[spark] object PythonDStream { + + // helper function for DStream.foreachRDD(), + // cannot be `foreachRDD`, it will confusing py4j + def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pyfunc: PythonRDDFunction){ +val func = new RDDFunction(pyfunc) +jdstream.dstream.foreachRDD((rdd, time) = func(Some(rdd), time)) + } + + // helper function for ssc.transform() + def callTransform(ssc: JavaStreamingContext, jdsteams: JList[JavaDStream[_]], +pyfunc: PythonRDDFunction) +:JavaDStream[Array[Byte]] = { +val func = new RDDFunction(pyfunc) +ssc.transform(jdsteams, func) + } + + // convert list of RDD into queue of RDDs, for ssc.queueStream() + def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]]): java.util.Queue[JavaRDD[Array[Byte]]] = { +val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]] +rdds.forall(queue.add(_)) +queue + } +} + +/** + * Transformed DStream in Python. + * + * If the result RDD is PythonRDD, then it will cache it as an template for future use, + * this can reduce the Python callbacks. + */ +private[spark] +class PythonTransformedDStream (parent: DStream[_], pfunc: PythonRDDFunction, +var reuse: Boolean = false) + extends PythonDStream(parent) { + + val func = new RDDFunction(pfunc) + var lastResult: PythonRDD = _ + + override def compute(validTime: Time): Option[RDD[Array[Byte]]] =
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18201177 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +791,110 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { +peerFetchLock.synchronized { + val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 * 1000) // milliseconds + val timeout = System.currentTimeMillis - lastPeerFetchTime cachedPeersTtl + if (cachedPeers == null || forceFetch || timeout) { +cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug(Fetched peers from master: + cachedPeers.mkString([, ,, ])) + } + cachedPeers +} + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt(spark.storage.maxReplicationFailures, 1) +val numPeersToReplicateTo = level.replication - 1 +val peersForReplication = new ArrayBuffer[BlockManagerId] --- End diff -- Why 3? This will be as large as `cluster size - 1`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18201208 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +791,110 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { +peerFetchLock.synchronized { + val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 * 1000) // milliseconds + val timeout = System.currentTimeMillis - lastPeerFetchTime cachedPeersTtl + if (cachedPeers == null || forceFetch || timeout) { +cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug(Fetched peers from master: + cachedPeers.mkString([, ,, ])) + } + cachedPeers +} + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt(spark.storage.maxReplicationFailures, 1) +val numPeersToReplicateTo = level.replication - 1 +val peersForReplication = new ArrayBuffer[BlockManagerId] --- End diff -- nvm ignore that comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18201230 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +791,110 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { +peerFetchLock.synchronized { + val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 * 1000) // milliseconds + val timeout = System.currentTimeMillis - lastPeerFetchTime cachedPeersTtl + if (cachedPeers == null || forceFetch || timeout) { +cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug(Fetched peers from master: + cachedPeers.mkString([, ,, ])) + } + cachedPeers +} + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt(spark.storage.maxReplicationFailures, 1) +val numPeersToReplicateTo = level.replication - 1 +val peersForReplication = new ArrayBuffer[BlockManagerId] +val peersReplicatedTo = new ArrayBuffer[BlockManagerId] +val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var replicationFailed = false +var failures = 0 +var done = false + +// Get cached list of peers +peersForReplication ++= getPeers(forceFetch = false) + +// Get a random peer. Note that this selection of a peer is deterministic on the block id. +// So assuming the list of peers does not change and no replication failures, +// if there are multiple attempts in the same node to replicate the same block, +// the same set of peers will be selected. +def getRandomPeer(): Option[BlockManagerId] = { + // If replication had failed, then force update the cached list of peers and remove the peers + // that have been already used + if (replicationFailed) { +peersForReplication.clear() +peersForReplication ++= getPeers(forceFetch = true) +peersForReplication --= peersReplicatedTo +peersForReplication --= peersFailedToReplicateTo + } + if (!peersForReplication.isEmpty) { +Some(peersForReplication(random.nextInt(peersForReplication.size))) + } else { +None + } } -for (peer: BlockManagerId - cachedPeers) { - val start = System.nanoTime - data.rewind() - logDebug(sTry to replicate $blockId once; The size of the data is ${data.limit()} Bytes. + -sTo node: $peer) - try { -blockTransferService.uploadBlockSync( - peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - } catch { -case e: Exception = - logError(sFailed to replicate block to $peer, e) +// One by one choose a random peer and try uploading the block to it +// If replication fails (e.g., target peer is down), force the list of cached peers +// to be re-fetched from driver and then pick another random peer for replication. Also +// temporarily black list the peer for which replication failed. +// +// This selection of a peer and replication is continued in a loop until one of the +// following 3 conditions is fulfilled: +// (i) specified number of peers have been replicated to +// (ii) too many failures in replicating to peers +// (iii) no peer left to replicate to +// +while (!done) { + getRandomPeer() match { +case Some(peer) = + try { +val onePeerStartTime = System.currentTimeMillis +data.rewind() +logTrace(sTrying to replicate $blockId of ${data.limit()} bytes to $peer) +blockTransferService.uploadBlockSync( + peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) +logTrace(sReplicated $blockId of ${data.limit()} bytes to $peer in %f ms + .format((System.currentTimeMillis - onePeerStartTime) / 1e3)) +
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18201286 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +791,110 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { +peerFetchLock.synchronized { + val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 * 1000) // milliseconds + val timeout = System.currentTimeMillis - lastPeerFetchTime cachedPeersTtl + if (cachedPeers == null || forceFetch || timeout) { +cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug(Fetched peers from master: + cachedPeers.mkString([, ,, ])) + } + cachedPeers +} + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt(spark.storage.maxReplicationFailures, 1) +val numPeersToReplicateTo = level.replication - 1 +val peersForReplication = new ArrayBuffer[BlockManagerId] +val peersReplicatedTo = new ArrayBuffer[BlockManagerId] +val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var replicationFailed = false +var failures = 0 +var done = false + +// Get cached list of peers +peersForReplication ++= getPeers(forceFetch = false) + +// Get a random peer. Note that this selection of a peer is deterministic on the block id. +// So assuming the list of peers does not change and no replication failures, +// if there are multiple attempts in the same node to replicate the same block, +// the same set of peers will be selected. +def getRandomPeer(): Option[BlockManagerId] = { + // If replication had failed, then force update the cached list of peers and remove the peers + // that have been already used + if (replicationFailed) { +peersForReplication.clear() +peersForReplication ++= getPeers(forceFetch = true) +peersForReplication --= peersReplicatedTo +peersForReplication --= peersFailedToReplicateTo + } + if (!peersForReplication.isEmpty) { +Some(peersForReplication(random.nextInt(peersForReplication.size))) + } else { +None + } } -for (peer: BlockManagerId - cachedPeers) { - val start = System.nanoTime - data.rewind() - logDebug(sTry to replicate $blockId once; The size of the data is ${data.limit()} Bytes. + -sTo node: $peer) - try { -blockTransferService.uploadBlockSync( - peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - } catch { -case e: Exception = - logError(sFailed to replicate block to $peer, e) +// One by one choose a random peer and try uploading the block to it +// If replication fails (e.g., target peer is down), force the list of cached peers +// to be re-fetched from driver and then pick another random peer for replication. Also +// temporarily black list the peer for which replication failed. +// +// This selection of a peer and replication is continued in a loop until one of the +// following 3 conditions is fulfilled: +// (i) specified number of peers have been replicated to +// (ii) too many failures in replicating to peers +// (iii) no peer left to replicate to +// +while (!done) { + getRandomPeer() match { +case Some(peer) = + try { +val onePeerStartTime = System.currentTimeMillis +data.rewind() +logTrace(sTrying to replicate $blockId of ${data.limit()} bytes to $peer) +blockTransferService.uploadBlockSync( + peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) +logTrace(sReplicated $blockId of ${data.limit()} bytes to $peer in %f ms + .format((System.currentTimeMillis - onePeerStartTime) / 1e3)) +
[GitHub] spark pull request: [SPARK-3613] Record only average block size in...
Github user Ishiihara commented on the pull request: https://github.com/apache/spark/pull/2470#issuecomment-57274044 @rxin I looked through Roaring bitmap and that is a highly compressed bitmap compared with other bitmap implementations. I will start working on this and keep you updated with progress and issues coming up during implementation. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL] Prevents per row dynamic dispatching and...
GitHub user liancheng opened a pull request: https://github.com/apache/spark/pull/2592 [SQL] Prevents per row dynamic dispatching and pattern matching when inserting Hive values Builds all wrappers at first according to object inspector types to avoid per row costs. TODO: - [ ] Micro benchmark You can merge this pull request into a Git repository by running: $ git pull https://github.com/liancheng/spark hive-value-wrapper Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2592.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2592 commit 499866606d079c30bae50d799bbe1ba1488ec343 Author: Cheng Lian lian.cs@gmail.com Date: 2014-09-30T06:55:09Z Prevents per row dynamic dispatching and pattern matching when inserting Hive values --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3412][SQL]add missing row api
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2529#issuecomment-57274900 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/208/consoleFull) for PR 2529 at commit [`4c18c29`](https://github.com/apache/spark/commit/4c18c29faedd52ca6a3d925ea039841b860862f7). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL] Prevents per row dynamic dispatching and...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2592#issuecomment-57274920 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21026/consoleFull) for PR 2592 at commit [`4998666`](https://github.com/apache/spark/commit/499866606d079c30bae50d799bbe1ba1488ec343). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18201664 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala --- @@ -413,7 +413,7 @@ class StreamingContext private[streaming] ( dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) = RDD[T] ): DStream[T] = { -new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc)) +new TransformedDStream[T](dstreams, transformFunc) --- End diff -- Without this change, ssc.transform() will failed to serialize transformFunc, because the callback function from python is not serializable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3654][SQL] Implement all extended HiveQ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2590#issuecomment-57276077 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/209/consoleFull) for PR 2590 at commit [`ba26cd1`](https://github.com/apache/spark/commit/ba26cd1a5895fa71a028992b5c946bd8333dfdc3). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2432#discussion_r18202042 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala --- @@ -34,9 +36,11 @@ private[spark] trait SchedulerBackend { /** * The application ID associated with the job, if any. + * It is expected that the subclasses of TaskScheduler or SchedulerBackend + * override this method and return an unique application ID. * - * @return The application ID, or None if the backend does not provide an ID. + * @return The application ID, if the backend does not provide an ID. */ - def applicationId(): Option[String] = None + def applicationId() = new ApplicationId(System.currentTimeMillis.toString) --- End diff -- I've checked Mesos' code and I found following things. * FrameworkID is assigned by MesosMaster for each process which execute SchedulerDriver(for Mesos, MesosSchedulerDriver), and registration. Instances of MesosSchedulerDriver are contained in CoarseMesosSchedulerBackend/MesosSchedulerBackend. * CoarseMesosSchedulerBackend/MesosSchedulerBackend#start is invoked, MesosMaster registers a framework to a registered-frameworks-list. * When CoraseMesosSchedulerBackend/MesosSchedulerBackend#stop is invoked, Master unregisters a framework from a registered-frameworks-list. So, I think we can use FrameworkID as an unique Application ID. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2432#issuecomment-57276520 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21027/consoleFull) for PR 2432 at commit [`42bea55`](https://github.com/apache/spark/commit/42bea55e3a4b665f8b3a6f9a423a2a199ea55093). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2330#issuecomment-57276579 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21025/consoleFull) for PR 2330 at commit [`0dae310`](https://github.com/apache/spark/commit/0dae31022fa26abc806db94e02fa7c15a031d1c1). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2330#issuecomment-57276584 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21025/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3677] [BUILD] [YARN] Scalastyle is neve...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2520#issuecomment-57276930 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21028/consoleFull) for PR 2520 at commit [`641b9e7`](https://github.com/apache/spark/commit/641b9e704d72b4111b1fb8662c523e31559980d8). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2591#issuecomment-57277304 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/212/consoleFull) for PR 2591 at commit [`ab99cc0`](https://github.com/apache/spark/commit/ab99cc0b3cbaa3f83c0feb40436cdf389905b658). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18202539 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -43,9 +46,34 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable throw new NotImplementedError(clone() is not implemented.) } +private [spark] +object RandomSampler { + // Default random number generator used by random samplers + def rngDefault: Random = new XORShiftRandom + + // Default gap sampling maximum + // For sampling fractions = this value, the gap sampling optimization will be applied. + // Above this value, it is assumed that tradtional bernoulli sampling is faster. The + // optimal value for this will depend on the RNG. More expensive RNGs will tend to make + // the optimal value higher. The most reliable way to determine this value for a given RNG + // is to experiment. I would expect a value of 0.5 to be close in most cases. + def gsmDefault: Double = 0.4 + + // Default gap sampling epsilon + // When sampling random floating point values the gap sampling logic requires value 0. An + // optimal value for this parameter is at or near the minimum positive floating point value + // returned by nextDouble() for the RNG being used. + def epsDefault: Double = 5e-11 --- End diff -- Yeah I meant add .0 for clarity, then subtract `Double` for being overkill. I think this would be more consistent with Scala/Spark style that way, but at least I'd argue for the .0. Trivial here; more of a minor question for the whole code base. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL] Prevents per row dynamic dispatching and...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2592#issuecomment-57277565 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21026/consoleFull) for PR 2592 at commit [`4998666`](https://github.com/apache/spark/commit/499866606d079c30bae50d799bbe1ba1488ec343). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL] Prevents per row dynamic dispatching and...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2592#issuecomment-57277568 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21026/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18202776 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +81,237 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + // epsilon slop to avoid failure from floating point jitter --- End diff -- Ah right, I still had in mind that the check was using `ub-lb`, but it isn't. Below, in both proposed versions of the code, could you not also specially handle the case where ub-lb = 1? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18202866 --- Diff: python/pyspark/serializers.py --- @@ -114,6 +114,9 @@ def __ne__(self, other): def __repr__(self): return %s object % self.__class__.__name__ +def __hash__(self): +return hash(str(self)) --- End diff -- This is necessary, we need to check the serializers of dstreams. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2538#issuecomment-57278200 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21029/consoleFull) for PR 2538 at commit [`e00136b`](https://github.com/apache/spark/commit/e00136b3dfd330689d89e44006a49871b36a4825). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18202884 --- Diff: python/pyspark/streaming/context.py --- @@ -0,0 +1,243 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from py4j.java_collections import ListConverter +from py4j.java_gateway import java_import + +from pyspark import RDD +from pyspark.serializers import UTF8Deserializer +from pyspark.context import SparkContext +from pyspark.storagelevel import StorageLevel +from pyspark.streaming.dstream import DStream +from pyspark.streaming.util import RDDFunction + +__all__ = [StreamingContext] + + +def _daemonize_callback_server(): + +Hack Py4J to daemonize callback server --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18202894 --- Diff: python/pyspark/streaming/dstream.py --- @@ -0,0 +1,633 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from itertools import chain, ifilter, imap +import operator +import time +from datetime import datetime + +from pyspark import RDD +from pyspark.storagelevel import StorageLevel +from pyspark.streaming.util import rddToFileName, RDDFunction +from pyspark.rdd import portable_hash +from pyspark.resultiterable import ResultIterable + +__all__ = [DStream] + + +class DStream(object): --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18202907 --- Diff: python/pyspark/streaming/dstream.py --- @@ -0,0 +1,633 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from itertools import chain, ifilter, imap +import operator +import time +from datetime import datetime + +from pyspark import RDD +from pyspark.storagelevel import StorageLevel +from pyspark.streaming.util import rddToFileName, RDDFunction +from pyspark.rdd import portable_hash +from pyspark.resultiterable import ResultIterable + +__all__ = [DStream] + + +class DStream(object): +def __init__(self, jdstream, ssc, jrdd_deserializer): +self._jdstream = jdstream +self._ssc = ssc +self.ctx = ssc._sc +self._jrdd_deserializer = jrdd_deserializer +self.is_cached = False +self.is_checkpointed = False + +def context(self): + +Return the StreamingContext associated with this DStream + +return self._ssc + +def count(self): + +Return a new DStream which contains the number of elements in this DStream. + +return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() + +def sum(self): + +Add up the elements in this DStream. + +return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) + +def filter(self, f): + +Return a new DStream containing only the elements that satisfy predicate. + +def func(iterator): +return ifilter(f, iterator) +return self.mapPartitions(func, True) + +def flatMap(self, f, preservesPartitioning=False): + +Pass each value in the key-value pair DStream through flatMap function +without changing the keys: this also retains the original RDD's partition. + +def func(s, iterator): +return chain.from_iterable(imap(f, iterator)) +return self.mapPartitionsWithIndex(func, preservesPartitioning) + +def map(self, f, preservesPartitioning=False): + +Return a new DStream by applying a function to each element of DStream. + +def func(iterator): +return imap(f, iterator) +return self.mapPartitions(func, preservesPartitioning) + +def mapPartitions(self, f, preservesPartitioning=False): + +Return a new DStream by applying a function to each partition of this DStream. + +def func(s, iterator): +return f(iterator) +return self.mapPartitionsWithIndex(func, preservesPartitioning) + +def mapPartitionsWithIndex(self, f, preservesPartitioning=False): + +Return a new DStream by applying a function to each partition of this DStream, +while tracking the index of the original partition. + +return self.transform(lambda rdd: rdd.mapPartitionsWithIndex(f, preservesPartitioning)) + +def reduce(self, func): + +Return a new DStream by reduceing the elements of this RDD using the specified +commutative and associative binary operator. + +return self.map(lambda x: (None, x)).reduceByKey(func, 1).map(lambda x: x[1]) + +def reduceByKey(self, func, numPartitions=None): + +Merge the value for each key using an associative reduce function. + +This will also perform the merging locally on each mapper before +sending results to reducer, similarly to a combiner in MapReduce. + +Output will be hash-partitioned with C{numPartitions} partitions, or +the default parallelism level if C{numPartitions} is not specified. + +return
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18203052 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.python + +import java.util.{ArrayList = JArrayList, List = JList} +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.api.java._ +import org.apache.spark.api.python._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Interval, Duration, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.api.java._ + + +/** + * Interface for Python callback function with three arguments + */ +trait PythonRDDFunction { + def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]] +} + +/** + * Wrapper for PythonRDDFunction + */ +private[python] class RDDFunction(pfunc: PythonRDDFunction) + extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable { + + def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = { +if (rdd.isDefined) { + JavaRDD.fromRDD(rdd.get) +} else { + null +} + } + + def some(jrdd: JavaRDD[Array[Byte]]): Option[RDD[Array[Byte]]] = { +if (jrdd != null) { + Some(jrdd.rdd) +} else { + None +} + } + + def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { +some(pfunc.call(time.milliseconds, List(wrapRDD(rdd)).asJava)) + } + + def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { +some(pfunc.call(time.milliseconds, List(wrapRDD(rdd), wrapRDD(rdd2)).asJava)) + } + + // for JFunction2 + def call(rdds: JList[JavaRDD[_]], time: Time): JavaRDD[Array[Byte]] = { +pfunc.call(time.milliseconds, rdds) + } +} + +private[python] +abstract class PythonDStream(parent: DStream[_]) extends DStream[Array[Byte]] (parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + val asJavaDStream = JavaDStream.fromDStream(this) +} + +private[spark] object PythonDStream { + + // helper function for DStream.foreachRDD(), + // cannot be `foreachRDD`, it will confusing py4j + def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pyfunc: PythonRDDFunction){ +val func = new RDDFunction(pyfunc) +jdstream.dstream.foreachRDD((rdd, time) = func(Some(rdd), time)) + } + + // helper function for ssc.transform() + def callTransform(ssc: JavaStreamingContext, jdsteams: JList[JavaDStream[_]], +pyfunc: PythonRDDFunction) +:JavaDStream[Array[Byte]] = { +val func = new RDDFunction(pyfunc) +ssc.transform(jdsteams, func) + } + + // convert list of RDD into queue of RDDs, for ssc.queueStream() + def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]]): java.util.Queue[JavaRDD[Array[Byte]]] = { +val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]] +rdds.forall(queue.add(_)) +queue + } +} + +/** + * Transformed DStream in Python. + * + * If the result RDD is PythonRDD, then it will cache it as an template for future use, + * this can reduce the Python callbacks. + */ +private[spark] +class PythonTransformedDStream (parent: DStream[_], pfunc: PythonRDDFunction, +var reuse: Boolean = false) + extends PythonDStream(parent) { + + val func = new RDDFunction(pfunc) + var lastResult: PythonRDD = _ + + override def compute(validTime: Time): Option[RDD[Array[Byte]]]
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2538#discussion_r18203252 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.python + +import java.util.{ArrayList = JArrayList, List = JList} +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.api.java._ +import org.apache.spark.api.python._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Interval, Duration, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.api.java._ + + +/** + * Interface for Python callback function with three arguments + */ +trait PythonRDDFunction { + def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]] +} + +/** + * Wrapper for PythonRDDFunction + */ +private[python] class RDDFunction(pfunc: PythonRDDFunction) --- End diff -- I had tried implicit conversion, but failed. In the case of func(rdd, rdd2, time), scala can not create an RDDFunction for it automatically. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user davies commented on the pull request: https://github.com/apache/spark/pull/2538#issuecomment-57279309 @giwa After fixing the problem of increasing partitions (it will increase performance problem), the tests run very stable now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user davies commented on the pull request: https://github.com/apache/spark/pull/2538#issuecomment-57279408 @tdas I should have addressed all your comments (or leave comment), please take another look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2538#issuecomment-57279516 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21031/consoleFull) for PR 2538 at commit [`b98d63f`](https://github.com/apache/spark/commit/b98d63fbde10f20a42e1e6e0f34f45736b802772). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2330#issuecomment-57279534 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21032/consoleFull) for PR 2330 at commit [`ad09236`](https://github.com/apache/spark/commit/ad092361f649b82dff64c44a30b50af1e90c). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2538#issuecomment-57279780 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21030/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3713][SQL] Uses JSON to serialize DataT...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2563#issuecomment-57279975 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/210/consoleFull) for PR 2563 at commit [`03da3ec`](https://github.com/apache/spark/commit/03da3ec870940bd6ff56e03450993da6125b40a4). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3652] [SQL] upgrade spark sql hive vers...
Github user liancheng commented on the pull request: https://github.com/apache/spark/pull/2499#issuecomment-57280380 @scwf A shim layer seems reasonable if we can make clean abstractions. A major issue is that the original `HiveServer`/`HiveServer2` not designed to be extended by other applications, that's why we have to use reflection tricks to implement `HiveThriftServer2` and `SparkSQLCLIDriver`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3689] FileLogger should create new inst...
Github user sarutak closed the pull request at: https://github.com/apache/spark/pull/2534 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3689] FileLogger should create new inst...
Github user sarutak commented on the pull request: https://github.com/apache/spark/pull/2534#issuecomment-57280754 I noticed it's needed only for HDFS, so I close this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2432#issuecomment-57280938 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21027/consoleFull) for PR 2432 at commit [`42bea55`](https://github.com/apache/spark/commit/42bea55e3a4b665f8b3a6f9a423a2a199ea55093). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `public class ApplicationId implements Serializable ` * ` case class KillExecutor(` * ` case class MasterChangeAcknowledged(appId: ApplicationId)` * ` case class RegisteredApplication(appId: ApplicationId, masterUrl: String) extends DeployMessage` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2432#issuecomment-57280945 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21027/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...
Github user sarutak commented on the pull request: https://github.com/apache/spark/pull/2432#issuecomment-57281035 retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [WIP][SPARK-3247][SQL] An API for adding forei...
Github user liancheng commented on the pull request: https://github.com/apache/spark/pull/2475#issuecomment-57281203 Considering from a user's perspective, mixing all kinds of data sources is cool, but correspond to `TableScan`, we also need a `TableInsert` trait to close the loop. Usually a foreign data source should implement `TableScan`, and optionally implement `TableInsert`, which should be generally easier to implement than the former. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3377] [SPARK-3610] Metrics can be accid...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2432#issuecomment-57281304 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21033/consoleFull) for PR 2432 at commit [`42bea55`](https://github.com/apache/spark/commit/42bea55e3a4b665f8b3a6f9a423a2a199ea55093). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3677] [BUILD] [YARN] Scalastyle is neve...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2520#issuecomment-57281411 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21028/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3677] [BUILD] [YARN] Scalastyle is neve...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2520#issuecomment-57281406 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21028/consoleFull) for PR 2520 at commit [`641b9e7`](https://github.com/apache/spark/commit/641b9e704d72b4111b1fb8662c523e31559980d8). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user giwa commented on the pull request: https://github.com/apache/spark/pull/2538#issuecomment-57281449 @davies That's great! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3677] [BUILD] [YARN] Scalastyle is neve...
Github user sarutak commented on the pull request: https://github.com/apache/spark/pull/2520#issuecomment-57281529 retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1830 Deploy failover, Make Persistence e...
Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/spark/pull/771#discussion_r18204443 --- Diff: core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala --- @@ -79,11 +80,9 @@ private[spark] class FileSystemPersistenceEngine( val created = file.createNewFile() if (!created) { throw new IllegalStateException(Could not create file: + file) } -val serializer = serialization.findSerializerFor(value) -val serialized = serializer.toBinary(value) - +val serialized = serializer.serialize(value) val out = new FileOutputStream(file) -out.write(serialized) +out.write(serialized.array()) --- End diff -- I did not understand, what you meant here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3677] [BUILD] [YARN] Scalastyle is neve...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2520#issuecomment-57282193 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21034/consoleFull) for PR 2520 at commit [`641b9e7`](https://github.com/apache/spark/commit/641b9e704d72b4111b1fb8662c523e31559980d8). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3741] Make ConnectionManager propagate ...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/2593 [SPARK-3741] Make ConnectionManager propagate errors properly and add mo... ...re logs to avoid Executors swallowing errors This PR made the following changes: * Register a callback to `Connection` so that the error will be propagated properly. * Add more logs so that the errors won't be swallowed by Executors. * Use trySuccess/tryFailure because `Promise` doesn't allow to call success/failure more than once. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-3741 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2593.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2593 commit 764aec5f256babf5baff20cb0c0cf5532c2a5f20 Author: zsxwing zsxw...@gmail.com Date: 2014-09-30T07:04:16Z [SPARK-3741] Make ConnectionManager propagate errors properly and add more logs to avoid Executors swallowing errors --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3741] Make ConnectionManager propagate ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2593#issuecomment-57283216 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21035/consoleFull) for PR 2593 at commit [`764aec5`](https://github.com/apache/spark/commit/764aec5f256babf5baff20cb0c0cf5532c2a5f20). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2538#issuecomment-57284243 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21029/consoleFull) for PR 2538 at commit [`e00136b`](https://github.com/apache/spark/commit/e00136b3dfd330689d89e44006a49871b36a4825). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class StreamingContext(object):` * `class DStream(object):` * `class TransformedDStream(DStream):` * `class RDDFunction(object):` * `abstract class PythonDStream(parent: DStream[_], pfunc: PythonRDDFunction)` * `class PythonTransformedDStream (parent: DStream[_], pfunc: PythonRDDFunction,` * `class PythonTransformed2DStream(parent: DStream[_], parent2: DStream[_],` * `class PythonStateDStream(parent: DStream[Array[Byte]], reduceFunc: PythonRDDFunction)` * `class PythonReducedWindowedDStream(parent: DStream[Array[Byte]],` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2538#issuecomment-57284254 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21029/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2591#issuecomment-57284538 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/212/consoleFull) for PR 2591 at commit [`ab99cc0`](https://github.com/apache/spark/commit/ab99cc0b3cbaa3f83c0feb40436cdf389905b658). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3718] FsHistoryProvider should consider...
Github user WangTaoTheTonic commented on the pull request: https://github.com/apache/spark/pull/2573#issuecomment-57285525 Actually HistoryServer can read application logs generated by Spark apps on another node. The `spark.eventLog.dir` could be different between this and that. So on my opinion it is flexible to seperate the two configs. Also `spark.eventLog.dir` is activated only if `spark.eventLog.enabled` is true. If HistoryServer load data in `spark.eventLog.dir`, is it necessary to check value of `spark.eventLog.enabled`? In a word current solution is simple and loose coupling. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3709] Executors don't always report bro...
Github user nchammas commented on the pull request: https://github.com/apache/spark/pull/2588#issuecomment-57285986 @nchammas - would you be interested in submitting a pr to change the qa script so that the timeout and failure message already prints the commit hash? @rxin Straight failures should already include the commit hash, [like here](https://github.com/apache/spark/pull/2588#issuecomment-57272186). (Note that messages like [this one](https://github.com/apache/spark/pull/2588#issuecomment-57272188) do not come from our script.) I can make a PR to add the commit hash to the timeout messages. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2377] Python API for Streaming
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2538#issuecomment-57286041 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21031/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org