[jira] [Updated] (SPARK-8300) DataFrame hint for broadcast join
[ https://issues.apache.org/jira/browse/SPARK-8300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Armbrust updated SPARK-8300: Shepherd: Michael Armbrust DataFrame hint for broadcast join - Key: SPARK-8300 URL: https://issues.apache.org/jira/browse/SPARK-8300 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin Assignee: Reynold Xin It is not always possible to have perfect cardinality estimation. We should allow users to give hint to the optimizer to do broadcast join. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8342) Decimal Math beyond ~2^112 is broken
[ https://issues.apache.org/jira/browse/SPARK-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14584961#comment-14584961 ] Rene Treffer commented on SPARK-8342: - [~rxin] [~viirya] yes, I've tested with the patch applied (both the original report, which is now fixed, and my original problem, which is not fixed) :-S I've opened SPARK-8359 for the precision loss. Sorry for the confusion, this patch still causes a similar problem: {code} import org.apache.spark.sql.types.Decimal val d = Decimal(Long.MaxValue,100,0) * Decimal(Long.MaxValue,100,0) d.toJavaBigDecimal.unscaledValue.toString 8507059173023461584739690778423250 {code} But cross-checking with bc says it should be 85070591730234615847396907784232501249 ((2^63 - 1) * (2^63 - 1)) 8507059173023461584739690778423250 is truncated. Calling changePrecision(100,0) after the multiplication results in 8507059173023461584739690778423250 Anyway, different bug, different ticket, although the problem is also present in this case, it's just hidden behind another bug 0.o Decimal Math beyond ~2^112 is broken Key: SPARK-8342 URL: https://issues.apache.org/jira/browse/SPARK-8342 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.5.0 Reporter: Rene Treffer Assignee: Liang-Chi Hsieh Fix For: 1.5.0 Here is a snippet from the spark-shell that should not happen {code} scala val d = Decimal(Long.MaxValue,100,0) * Decimal(Long.MaxValue,100,0) d: org.apache.spark.sql.types.Decimal = 0 scala d.toDebugString res3: String = Decimal(expanded,0,1,0}) {code} It looks like precision gets reseted on some operations and values are then truncated. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8359) Spark SQL Decimal type precision loss on multiplication
[ https://issues.apache.org/jira/browse/SPARK-8359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14585018#comment-14585018 ] Apache Spark commented on SPARK-8359: - User 'viirya' has created a pull request for this issue: https://github.com/apache/spark/pull/6814 Spark SQL Decimal type precision loss on multiplication --- Key: SPARK-8359 URL: https://issues.apache.org/jira/browse/SPARK-8359 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.5.0 Reporter: Rene Treffer It looks like the precision of decimal can not be raised beyond ~2^112 without causing full value truncation. The following code computes the power of two up to a specific point {code} import org.apache.spark.sql.types.Decimal val one = Decimal(1) val two = Decimal(2) def pow(n : Int) : Decimal = if (n = 0) { one } else { val a = pow(n - 1) a.changePrecision(n,0) two.changePrecision(n,0) a * two } (109 to 120).foreach(n = println(pow(n).toJavaBigDecimal.unscaledValue.toString)) 649037107316853453566312041152512 1298074214633706907132624082305024 2596148429267413814265248164610048 5192296858534827628530496329220096 1038459371706965525706099265844019 2076918743413931051412198531688038 4153837486827862102824397063376076 8307674973655724205648794126752152 1661534994731144841129758825350430 3323069989462289682259517650700860 6646139978924579364519035301401720 1329227995784915872903807060280344 {code} Beyond ~2^112 the precision is truncated even if the precision was set to n and should thus handle 10^n without problems.. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-8284) Regualarized Extreme Learning Machine for MLLib
[ https://issues.apache.org/jira/browse/SPARK-8284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-8284. -- Resolution: Later I think this can be considered later if there is a working POC that's been published and shown to be effective Regualarized Extreme Learning Machine for MLLib --- Key: SPARK-8284 URL: https://issues.apache.org/jira/browse/SPARK-8284 Project: Spark Issue Type: New Feature Components: MLlib Affects Versions: 1.3.1 Reporter: 李力 Extreme Learning Machine can get better generalization performance at a much faster learning speed for regression and classification problem,but the enlarging volume of datasets makes regression by ELM on very large scale datasets a challenging task. Through analyzing the mechanism of ELM algorithm , an efficient parallel ELM for regression is designed and implemented based on Spark. The experimental results demonstrate that the propose parallel ELM for regression can efficiently handle very large dataset with a good performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-8359) Spark SQL Decimal type precision loss on multiplication
[ https://issues.apache.org/jira/browse/SPARK-8359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-8359: --- Assignee: Apache Spark Spark SQL Decimal type precision loss on multiplication --- Key: SPARK-8359 URL: https://issues.apache.org/jira/browse/SPARK-8359 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.5.0 Reporter: Rene Treffer Assignee: Apache Spark It looks like the precision of decimal can not be raised beyond ~2^112 without causing full value truncation. The following code computes the power of two up to a specific point {code} import org.apache.spark.sql.types.Decimal val one = Decimal(1) val two = Decimal(2) def pow(n : Int) : Decimal = if (n = 0) { one } else { val a = pow(n - 1) a.changePrecision(n,0) two.changePrecision(n,0) a * two } (109 to 120).foreach(n = println(pow(n).toJavaBigDecimal.unscaledValue.toString)) 649037107316853453566312041152512 1298074214633706907132624082305024 2596148429267413814265248164610048 5192296858534827628530496329220096 1038459371706965525706099265844019 2076918743413931051412198531688038 4153837486827862102824397063376076 8307674973655724205648794126752152 1661534994731144841129758825350430 3323069989462289682259517650700860 6646139978924579364519035301401720 1329227995784915872903807060280344 {code} Beyond ~2^112 the precision is truncated even if the precision was set to n and should thus handle 10^n without problems.. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-8359) Spark SQL Decimal type precision loss on multiplication
[ https://issues.apache.org/jira/browse/SPARK-8359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-8359: --- Assignee: (was: Apache Spark) Spark SQL Decimal type precision loss on multiplication --- Key: SPARK-8359 URL: https://issues.apache.org/jira/browse/SPARK-8359 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.5.0 Reporter: Rene Treffer It looks like the precision of decimal can not be raised beyond ~2^112 without causing full value truncation. The following code computes the power of two up to a specific point {code} import org.apache.spark.sql.types.Decimal val one = Decimal(1) val two = Decimal(2) def pow(n : Int) : Decimal = if (n = 0) { one } else { val a = pow(n - 1) a.changePrecision(n,0) two.changePrecision(n,0) a * two } (109 to 120).foreach(n = println(pow(n).toJavaBigDecimal.unscaledValue.toString)) 649037107316853453566312041152512 1298074214633706907132624082305024 2596148429267413814265248164610048 5192296858534827628530496329220096 1038459371706965525706099265844019 2076918743413931051412198531688038 4153837486827862102824397063376076 8307674973655724205648794126752152 1661534994731144841129758825350430 3323069989462289682259517650700860 6646139978924579364519035301401720 1329227995784915872903807060280344 {code} Beyond ~2^112 the precision is truncated even if the precision was set to n and should thus handle 10^n without problems.. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8300) DataFrame hint for broadcast join
[ https://issues.apache.org/jira/browse/SPARK-8300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Armbrust updated SPARK-8300: Assignee: Reynold Xin DataFrame hint for broadcast join - Key: SPARK-8300 URL: https://issues.apache.org/jira/browse/SPARK-8300 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin Assignee: Reynold Xin It is not always possible to have perfect cardinality estimation. We should allow users to give hint to the optimizer to do broadcast join. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-8363) Move sqrt into math
Reynold Xin created SPARK-8363: -- Summary: Move sqrt into math Key: SPARK-8363 URL: https://issues.apache.org/jira/browse/SPARK-8363 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin It doesn't really belong in Arithmetic. It should also extend UnaryMathExpression. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-8361) Session of ThriftServer is still alive after I exit beeline
cen yuhai created SPARK-8361: Summary: Session of ThriftServer is still alive after I exit beeline Key: SPARK-8361 URL: https://issues.apache.org/jira/browse/SPARK-8361 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.4.0 Environment: centos6.2 spark-1.4.0 Reporter: cen yuhai I connected to thriftserver through beeline, but after I exited beeline(maybe I will use 'ctrl + c' or 'ctrl+z'), it still exited in ThriftServer Web UI(SQL Tab). There are no Finish Time . If I use 'ctrl + d', it will have finish time. After reviewing the code, I think the session is still alive. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-8362) Add unit tests for +, -, *, /
Reynold Xin created SPARK-8362: -- Summary: Add unit tests for +, -, *, / Key: SPARK-8362 URL: https://issues.apache.org/jira/browse/SPARK-8362 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin Assignee: Reynold Xin Priority: Blocker -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-8323) Remove mapOutputTracker field in TaskSchedulerImpl
[ https://issues.apache.org/jira/browse/SPARK-8323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-8323. -- Resolution: Won't Fix Target Version/s: (was: 1.4.1) Remove mapOutputTracker field in TaskSchedulerImpl -- Key: SPARK-8323 URL: https://issues.apache.org/jira/browse/SPARK-8323 Project: Spark Issue Type: Improvement Components: Scheduler, Spark Core Reporter: patrickliu Because TaskSchedulerImpl's mapOutputTracker field is only referenced once in TaskSetManager. I think we could remove the mapOutputTracker field in the TaskSchedulerImpl class. Instead, we could reference the mapOutputTracker from SparkEnv directly in TaskSetManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-8359) Spark SQL Decimal type precision loss on multiplication
Rene Treffer created SPARK-8359: --- Summary: Spark SQL Decimal type precision loss on multiplication Key: SPARK-8359 URL: https://issues.apache.org/jira/browse/SPARK-8359 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.5.0 Reporter: Rene Treffer It looks like the precision of decimal can not be raised beyond ~2^112 without causing full value truncation. The following code computes the power of two up to a specific point {code} import org.apache.spark.sql.types.Decimal val one = Decimal(1) val two = Decimal(2) def pow(n : Int) : Decimal = if (n = 0) { one } else { val a = pow(n - 1) a.changePrecision(n,0) two.changePrecision(n,0) a * two } (109 to 120).foreach(n = println(pow(n).toJavaBigDecimal.unscaledValue.toString)) 649037107316853453566312041152512 1298074214633706907132624082305024 2596148429267413814265248164610048 5192296858534827628530496329220096 1038459371706965525706099265844019 2076918743413931051412198531688038 4153837486827862102824397063376076 8307674973655724205648794126752152 1661534994731144841129758825350430 3323069989462289682259517650700860 6646139978924579364519035301401720 1329227995784915872903807060280344 {code} Beyond ~2^112 the precision is truncated even if the precision was set to n and should thus handle 10^n without problems.. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-8360) Streaming DataFrames
Reynold Xin created SPARK-8360: -- Summary: Streaming DataFrames Key: SPARK-8360 URL: https://issues.apache.org/jira/browse/SPARK-8360 Project: Spark Issue Type: Umbrella Components: SQL, Streaming Reporter: Reynold Xin Umbrella ticket to track what's needed to make streaming DataFrame a reality. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-8362) Add unit tests for +, -, *, /
[ https://issues.apache.org/jira/browse/SPARK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-8362: --- Assignee: Reynold Xin (was: Apache Spark) Add unit tests for +, -, *, / - Key: SPARK-8362 URL: https://issues.apache.org/jira/browse/SPARK-8362 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin Assignee: Reynold Xin Priority: Blocker -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-8362) Add unit tests for +, -, *, /
[ https://issues.apache.org/jira/browse/SPARK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-8362: --- Assignee: Apache Spark (was: Reynold Xin) Add unit tests for +, -, *, / - Key: SPARK-8362 URL: https://issues.apache.org/jira/browse/SPARK-8362 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin Assignee: Apache Spark Priority: Blocker -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8362) Add unit tests for +, -, *, /
[ https://issues.apache.org/jira/browse/SPARK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14584984#comment-14584984 ] Apache Spark commented on SPARK-8362: - User 'rxin' has created a pull request for this issue: https://github.com/apache/spark/pull/6813 Add unit tests for +, -, *, / - Key: SPARK-8362 URL: https://issues.apache.org/jira/browse/SPARK-8362 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin Assignee: Reynold Xin Priority: Blocker -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6583) Support aggregated function in order by
[ https://issues.apache.org/jira/browse/SPARK-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14585194#comment-14585194 ] Apache Spark commented on SPARK-6583: - User 'marmbrus' has created a pull request for this issue: https://github.com/apache/spark/pull/6816 Support aggregated function in order by --- Key: SPARK-6583 URL: https://issues.apache.org/jira/browse/SPARK-6583 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 1.3.0 Reporter: Yadong Qi Assignee: Yadong Qi -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-8065) Add support for connecting to Hive 0.14 metastore
[ https://issues.apache.org/jira/browse/SPARK-8065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Armbrust resolved SPARK-8065. - Resolution: Fixed Fix Version/s: 1.5.0 Issue resolved by pull request 6627 [https://github.com/apache/spark/pull/6627] Add support for connecting to Hive 0.14 metastore - Key: SPARK-8065 URL: https://issues.apache.org/jira/browse/SPARK-8065 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin Assignee: Marcelo Vanzin Fix For: 1.5.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8010) Implict promote Numeric type to String type in HiveTypeCoercion
[ https://issues.apache.org/jira/browse/SPARK-8010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Armbrust updated SPARK-8010: Shepherd: Michael Armbrust Implict promote Numeric type to String type in HiveTypeCoercion --- Key: SPARK-8010 URL: https://issues.apache.org/jira/browse/SPARK-8010 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.3.1 Reporter: Li Sheng Fix For: 1.3.1 Original Estimate: 48h Remaining Estimate: 48h 1. Given a query `select coalesce(null, 1, '1') from dual` will cause exception: java.lang.RuntimeException: Could not determine return type of Coalesce for IntegerType,StringType 2. Given a query: `select case when true then 1 else '1' end from dual` will cause exception: java.lang.RuntimeException: Types in CASE WHEN must be the same or coercible to a common type: StringType != IntegerType I checked the code, the main cause is the HiveTypeCoercion doesn't do implicit convert when there is a IntegerType and StringType. Numeric types can be promoted to string type in case throw exceptions. Since Hive will always do this. It need to be fixed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-8362) Add unit tests for +, -, *, /, %
[ https://issues.apache.org/jira/browse/SPARK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Armbrust resolved SPARK-8362. - Resolution: Fixed Fix Version/s: 1.5.0 Issue resolved by pull request 6813 [https://github.com/apache/spark/pull/6813] Add unit tests for +, -, *, /, % Key: SPARK-8362 URL: https://issues.apache.org/jira/browse/SPARK-8362 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin Assignee: Reynold Xin Priority: Blocker Fix For: 1.5.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-8358) DataFrame explode with alias and * fails
[ https://issues.apache.org/jira/browse/SPARK-8358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Armbrust resolved SPARK-8358. - Resolution: Fixed Fix Version/s: 1.4.1 1.5.0 Issue resolved by pull request 6811 [https://github.com/apache/spark/pull/6811] DataFrame explode with alias and * fails Key: SPARK-8358 URL: https://issues.apache.org/jira/browse/SPARK-8358 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.4.0 Reporter: Michael Armbrust Assignee: Michael Armbrust Priority: Blocker Fix For: 1.5.0, 1.4.1 {code} scala Seq((Array(a), 1)).toDF(a, b).select(explode($a).as(a), $*) org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'a at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:60) at org.apache.spark.sql.catalyst.expressions.Explode.elementTypes(generators.scala:107) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$AliasedGenerator$.unapply(Analyzer.scala:577) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16$$anonfun$22.apply(Analyzer.scala:535) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16$$anonfun$22.apply(Analyzer.scala:534) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ... {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8336) Fix NullPointerException with functions.rand()
[ https://issues.apache.org/jira/browse/SPARK-8336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Armbrust updated SPARK-8336: Target Version/s: 1.5.0 Shepherd: Reynold Xin Fix NullPointerException with functions.rand() -- Key: SPARK-8336 URL: https://issues.apache.org/jira/browse/SPARK-8336 Project: Spark Issue Type: Bug Components: SQL Reporter: Ted Yu Assignee: Ted Yu The problem was first reported by Justin Yip in the thread 'NullPointerException with functions.rand()' Here is how to reproduce the problem: {code} sqlContext.createDataFrame(Seq((1,2), (3, 100))).withColumn(index, rand(30)).show() {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8336) Fix NullPointerException with functions.rand()
[ https://issues.apache.org/jira/browse/SPARK-8336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Armbrust updated SPARK-8336: Assignee: Ted Yu Fix NullPointerException with functions.rand() -- Key: SPARK-8336 URL: https://issues.apache.org/jira/browse/SPARK-8336 Project: Spark Issue Type: Bug Components: SQL Reporter: Ted Yu Assignee: Ted Yu The problem was first reported by Justin Yip in the thread 'NullPointerException with functions.rand()' Here is how to reproduce the problem: {code} sqlContext.createDataFrame(Seq((1,2), (3, 100))).withColumn(index, rand(30)).show() {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-8364) Add crosstab to SparkR DataFrames
Xiangrui Meng created SPARK-8364: Summary: Add crosstab to SparkR DataFrames Key: SPARK-8364 URL: https://issues.apache.org/jira/browse/SPARK-8364 Project: Spark Issue Type: New Feature Components: SparkR Reporter: Xiangrui Meng Assignee: Xiangrui Meng Add `crosstab` to SparkR DataFrames, which takes two column names and returns a local R data.frame. This is similar to `table` in R. However, `table` in SparkR is used for loading SQL tables as DataFrames. The return type is data.frame instead table for `crosstab` to be compatible with Scala/Python. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-7712) Native Spark Window Functions Performance Improvements
[ https://issues.apache.org/jira/browse/SPARK-7712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yin Huai updated SPARK-7712: Priority: Critical (was: Major) Native Spark Window Functions Performance Improvements - Key: SPARK-7712 URL: https://issues.apache.org/jira/browse/SPARK-7712 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 1.4.0 Reporter: Herman van Hovell tot Westerflier Priority: Critical Original Estimate: 336h Remaining Estimate: 336h Hi All, After playing with the current spark window implementation, I tried to take this to next level. My main goal is/was to address the following issues: Native Spark SQL Performance. *Native Spark SQL* The current implementation uses Hive UDAFs as its aggregation mechanism. We try to address the following issues by moving to a more 'native' Spark SQL approach: - Window functions require Hive. Some people (mostly by accident) use Spark SQL without Hive. Usage of UDAFs is still supported though. - Adding your own Aggregates requires you to write them in Hive instead of native Spark SQL. - Hive UDAFs are very well written and quite quick, but they are opaque in processing and memory management; this makes them hard to optimize. By using 'Native' Spark SQL constructs we can actually do alot more optimization, for example AggregateEvaluation style Window processing (this would require us to move some of the code out of the AggregateEvaluation class into some Common base class), or Tungten style memory management. *Performance* - Much better performance (10x) in running cases (e.g. BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) and UNBOUDED FOLLOWING cases. The current implementation in spark uses a sliding window approach in these cases. This means that an aggregate is maintained for every row, so space usage is N (N being the number of rows). This also means that all these aggregates all need to be updated separately, this takes N*(N-1)/2 updates. The running case differs from the Sliding case because we are only adding data to an aggregate function (no reset is required), we only need to maintain one aggregate (like in the UNBOUNDED PRECEDING AND UNBOUNDED case), update the aggregate for each row, and get the aggregate value after each update. This is what the new implementation does. This approach only uses 1 buffer, and only requires N updates; I am currently working on data with window sizes of 500-1000 doing running sums and this saves a lot of time. The CURRENT ROW AND UNBOUNDED FOLLOWING case also uses this approach and the fact that aggregate operations are communitative, there is one twist though it will process the input buffer in reverse. - Fewer comparisons in the sliding case. The current implementation determines frame boundaries for every input row. The new implementation makes more use of the fact that the window is sorted, maintains the boundaries, and only moves them when the current row order changes. This is a minor improvement. - A single Window node is able to process all types of Frames for the same Partitioning/Ordering. This saves a little time/memory spent buffering and managing partitions. - A lot of the staging code is moved from the execution phase to the initialization phase. Minor performance improvement, and improves readability of the execution code. The original work including some benchmarking code for the running case can be here: https://github.com/hvanhovell/spark-window A PR has been created, this is still work in progress, and can be found here: https://github.com/apache/spark/pull/6278 Comments, feedback and other discussion is much appreciated. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-8320) Add example in streaming programming guide that shows union of multiple input streams
[ https://issues.apache.org/jira/browse/SPARK-8320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14585248#comment-14585248 ] Neelesh Srinivas Salian edited comment on SPARK-8320 at 6/14/15 8:24 PM: - I would like to work on this JIRA. Could you please assign this to me? Thank you. was (Author: neelesh77): I would like to work on this JIRA. Could you please assign this to me. Thank you. Add example in streaming programming guide that shows union of multiple input streams - Key: SPARK-8320 URL: https://issues.apache.org/jira/browse/SPARK-8320 Project: Spark Issue Type: Sub-task Components: Streaming Affects Versions: 1.4.0 Reporter: Tathagata Das Priority: Minor Labels: starter The section on Level of Parallelism in Data Receiving has a Scala and a Java example for union of multiple input streams. A python example should be added. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8320) Add example in streaming programming guide that shows union of multiple input streams
[ https://issues.apache.org/jira/browse/SPARK-8320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14585248#comment-14585248 ] Neelesh Srinivas Salian commented on SPARK-8320: I would like to work on this JIRA. Could you please assign this to me. Thank you. Add example in streaming programming guide that shows union of multiple input streams - Key: SPARK-8320 URL: https://issues.apache.org/jira/browse/SPARK-8320 Project: Spark Issue Type: Sub-task Components: Streaming Affects Versions: 1.4.0 Reporter: Tathagata Das Priority: Minor Labels: starter The section on Level of Parallelism in Data Receiving has a Scala and a Java example for union of multiple input streams. A python example should be added. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-8354) Fix off-by-factor-of-8 error when allocating scratch space in UnsafeFixedWidthAggregationMap
[ https://issues.apache.org/jira/browse/SPARK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen resolved SPARK-8354. --- Resolution: Fixed Fix Version/s: 1.4.1 1.5.0 Issue resolved by pull request 6809 [https://github.com/apache/spark/pull/6809] Fix off-by-factor-of-8 error when allocating scratch space in UnsafeFixedWidthAggregationMap Key: SPARK-8354 URL: https://issues.apache.org/jira/browse/SPARK-8354 Project: Spark Issue Type: Bug Components: SQL Reporter: Josh Rosen Assignee: Josh Rosen Fix For: 1.5.0, 1.4.1 UnsafeFixedWidthAggregationMap contains an off-by-factor-of-8 error when allocating row conversion scratch space: we take a size requirement, measured in bytes, then allocate a long array of that size. This means that we end up allocating 8x too much conversion space. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8056) Design an easier way to construct schema for both Scala and Python
[ https://issues.apache.org/jira/browse/SPARK-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Armbrust updated SPARK-8056: Shepherd: Reynold Xin Design an easier way to construct schema for both Scala and Python -- Key: SPARK-8056 URL: https://issues.apache.org/jira/browse/SPARK-8056 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin StructType is fairly hard to construct, especially in Python. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8357) Memory leakage on unsafe aggregation path with empty input
[ https://issues.apache.org/jira/browse/SPARK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Armbrust updated SPARK-8357: Priority: Critical (was: Minor) Target Version/s: 1.4.1, 1.5.0 Shepherd: Josh Rosen Assignee: Navis Memory leakage on unsafe aggregation path with empty input -- Key: SPARK-8357 URL: https://issues.apache.org/jira/browse/SPARK-8357 Project: Spark Issue Type: Bug Components: SQL Reporter: Navis Assignee: Navis Priority: Critical Currently, unsafe-based hash is released on 'next' call but if input is empty, it would not be called ever. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-7289) Combine Limit and Sort to avoid total ordering
[ https://issues.apache.org/jira/browse/SPARK-7289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Armbrust updated SPARK-7289: Shepherd: Michael Armbrust Combine Limit and Sort to avoid total ordering -- Key: SPARK-7289 URL: https://issues.apache.org/jira/browse/SPARK-7289 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 1.3.1 Reporter: Fei Wang Assignee: Wenchen Fan Optimize following sql select key from (select * from testData order by key) t limit 5 from == Parsed Logical Plan == 'Limit 5 'Project ['key] 'Subquery t 'Sort ['key ASC], true 'Project [*] 'UnresolvedRelation [testData], None == Analyzed Logical Plan == Limit 5 Project [key#0] Subquery t Sort [key#0 ASC], true Project [key#0,value#1] Subquery testData LogicalRDD [key#0,value#1], MapPartitionsRDD[1] == Optimized Logical Plan == Limit 5 Project [key#0] Sort [key#0 ASC], true LogicalRDD [key#0,value#1], MapPartitionsRDD[1] == Physical Plan == Limit 5 Project [key#0] Sort [key#0 ASC], true Exchange (RangePartitioning [key#0 ASC], 5), [] PhysicalRDD [key#0,value#1], MapPartitionsRDD[1] to == Parsed Logical Plan == 'Limit 5 'Project ['key] 'Subquery t 'Sort ['key ASC], true 'Project [*] 'UnresolvedRelation [testData], None == Analyzed Logical Plan == Limit 5 Project [key#0] Subquery t Sort [key#0 ASC], true Project [key#0,value#1] Subquery testData LogicalRDD [key#0,value#1], MapPartitionsRDD[1] == Optimized Logical Plan == Project [key#0] Limit 5 Sort [key#0 ASC], true LogicalRDD [key#0,value#1], MapPartitionsRDD[1] == Physical Plan == Project [key#0] TakeOrdered 5, [key#0 ASC] PhysicalRDD [key#0,value#1], MapPartitionsRDD[1] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-7289) Combine Limit and Sort to avoid total ordering
[ https://issues.apache.org/jira/browse/SPARK-7289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Armbrust reopened SPARK-7289: - Assignee: Wenchen Fan Combine Limit and Sort to avoid total ordering -- Key: SPARK-7289 URL: https://issues.apache.org/jira/browse/SPARK-7289 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 1.3.1 Reporter: Fei Wang Assignee: Wenchen Fan Optimize following sql select key from (select * from testData order by key) t limit 5 from == Parsed Logical Plan == 'Limit 5 'Project ['key] 'Subquery t 'Sort ['key ASC], true 'Project [*] 'UnresolvedRelation [testData], None == Analyzed Logical Plan == Limit 5 Project [key#0] Subquery t Sort [key#0 ASC], true Project [key#0,value#1] Subquery testData LogicalRDD [key#0,value#1], MapPartitionsRDD[1] == Optimized Logical Plan == Limit 5 Project [key#0] Sort [key#0 ASC], true LogicalRDD [key#0,value#1], MapPartitionsRDD[1] == Physical Plan == Limit 5 Project [key#0] Sort [key#0 ASC], true Exchange (RangePartitioning [key#0 ASC], 5), [] PhysicalRDD [key#0,value#1], MapPartitionsRDD[1] to == Parsed Logical Plan == 'Limit 5 'Project ['key] 'Subquery t 'Sort ['key ASC], true 'Project [*] 'UnresolvedRelation [testData], None == Analyzed Logical Plan == Limit 5 Project [key#0] Subquery t Sort [key#0 ASC], true Project [key#0,value#1] Subquery testData LogicalRDD [key#0,value#1], MapPartitionsRDD[1] == Optimized Logical Plan == Project [key#0] Limit 5 Sort [key#0 ASC], true LogicalRDD [key#0,value#1], MapPartitionsRDD[1] == Physical Plan == Project [key#0] TakeOrdered 5, [key#0 ASC] PhysicalRDD [key#0,value#1], MapPartitionsRDD[1] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-7289) Combine Limit and Sort to avoid total ordering
[ https://issues.apache.org/jira/browse/SPARK-7289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Armbrust updated SPARK-7289: Target Version/s: 1.5.0 (was: 1.4.0) Combine Limit and Sort to avoid total ordering -- Key: SPARK-7289 URL: https://issues.apache.org/jira/browse/SPARK-7289 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 1.3.1 Reporter: Fei Wang Assignee: Wenchen Fan Optimize following sql select key from (select * from testData order by key) t limit 5 from == Parsed Logical Plan == 'Limit 5 'Project ['key] 'Subquery t 'Sort ['key ASC], true 'Project [*] 'UnresolvedRelation [testData], None == Analyzed Logical Plan == Limit 5 Project [key#0] Subquery t Sort [key#0 ASC], true Project [key#0,value#1] Subquery testData LogicalRDD [key#0,value#1], MapPartitionsRDD[1] == Optimized Logical Plan == Limit 5 Project [key#0] Sort [key#0 ASC], true LogicalRDD [key#0,value#1], MapPartitionsRDD[1] == Physical Plan == Limit 5 Project [key#0] Sort [key#0 ASC], true Exchange (RangePartitioning [key#0 ASC], 5), [] PhysicalRDD [key#0,value#1], MapPartitionsRDD[1] to == Parsed Logical Plan == 'Limit 5 'Project ['key] 'Subquery t 'Sort ['key ASC], true 'Project [*] 'UnresolvedRelation [testData], None == Analyzed Logical Plan == Limit 5 Project [key#0] Subquery t Sort [key#0 ASC], true Project [key#0,value#1] Subquery testData LogicalRDD [key#0,value#1], MapPartitionsRDD[1] == Optimized Logical Plan == Project [key#0] Limit 5 Sort [key#0 ASC], true LogicalRDD [key#0,value#1], MapPartitionsRDD[1] == Physical Plan == Project [key#0] TakeOrdered 5, [key#0 ASC] PhysicalRDD [key#0,value#1], MapPartitionsRDD[1] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8247) string function: instr
[ https://issues.apache.org/jira/browse/SPARK-8247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Armbrust updated SPARK-8247: Target Version/s: 1.5.0 string function: instr -- Key: SPARK-8247 URL: https://issues.apache.org/jira/browse/SPARK-8247 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin Assignee: Cheng Hao instr(string str, string substr): int Returns the position of the first occurrence of substr in str. Returns null if either of the arguments are null and returns 0 if substr could not be found in str. Be aware that this is not zero based. The first character in str has index 1. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-8365) pyspark does not retain --packages or --jars passed on the command line as of 1.4.0
Don Drake created SPARK-8365: Summary: pyspark does not retain --packages or --jars passed on the command line as of 1.4.0 Key: SPARK-8365 URL: https://issues.apache.org/jira/browse/SPARK-8365 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 1.4.0 Reporter: Don Drake I downloaded the pre-compiled Spark 1.4.0 and attempted to run an existing Python Spark application against it and got the following error: py4j.protocol.Py4JJavaError: An error occurred while calling o90.save. : java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.csv I pass the following on the command-line to my spark-submit: --packages com.databricks:spark-csv_2.10:1.0.3 This worked fine on 1.3.1, but not in 1.4. I was able to replicate it with the following pyspark: {code} a = {'a':1.0, 'b':'asdf'} rdd = sc.parallelize([a]) df = sqlContext.createDataFrame(rdd) df.save(/tmp/d.csv, com.databricks.spark.csv) {code} Even using the new df.write.format('com.databricks.spark.csv').save('/tmp/d.csv') gives the same error. I see it was added in the web UI: file:/Users/drake/.ivy2/jars/com.databricks_spark-csv_2.10-1.0.3.jarAdded By User file:/Users/drake/.ivy2/jars/org.apache.commons_commons-csv-1.1.jar Added By User http://10.0.0.222:56871/jars/com.databricks_spark-csv_2.10-1.0.3.jarAdded By User http://10.0.0.222:56871/jars/org.apache.commons_commons-csv-1.1.jar Added By User Thoughts? *I also attempted using the Scala spark-shell to load a csv using the same package and it worked just fine, so this seems specific to pyspark.* -Don Gory details: {code} $ pyspark --packages com.databricks:spark-csv_2.10:1.0.3 Python 2.7.6 (default, Sep 9 2014, 15:04:36) [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin Type help, copyright, credits or license for more information. Ivy Default Cache set to: /Users/drake/.ivy2/cache The jars for the packages stored in: /Users/drake/.ivy2/jars :: loading settings :: url = jar:file:/Users/drake/spark/spark-1.4.0-bin-hadoop2.6/lib/spark-assembly-1.4.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml com.databricks#spark-csv_2.10 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found com.databricks#spark-csv_2.10;1.0.3 in central found org.apache.commons#commons-csv;1.1 in central :: resolution report :: resolve 590ms :: artifacts dl 17ms :: modules in use: com.databricks#spark-csv_2.10;1.0.3 from central in [default] org.apache.commons#commons-csv;1.1 from central in [default] - | |modules|| artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| - | default | 2 | 0 | 0 | 0 || 2 | 0 | - :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 2 already retrieved (0kB/15ms) Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/06/13 11:06:08 INFO SparkContext: Running Spark version 1.4.0 2015-06-13 11:06:08.921 java[19233:2145789] Unable to load realm info from SCDynamicStore 15/06/13 11:06:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/06/13 11:06:09 WARN Utils: Your hostname, Dons-MacBook-Pro-2.local resolves to a loopback address: 127.0.0.1; using 10.0.0.222 instead (on interface en0) 15/06/13 11:06:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 15/06/13 11:06:09 INFO SecurityManager: Changing view acls to: drake 15/06/13 11:06:09 INFO SecurityManager: Changing modify acls to: drake 15/06/13 11:06:09 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(drake); users with modify permissions: Set(drake) 15/06/13 11:06:10 INFO Slf4jLogger: Slf4jLogger started 15/06/13 11:06:10 INFO Remoting: Starting remoting 15/06/13 11:06:10 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.0.222:56870] 15/06/13 11:06:10 INFO Utils: Successfully started service 'sparkDriver' on port 56870. 15/06/13 11:06:10 INFO SparkEnv: Registering MapOutputTracker 15/06/13 11:06:10 INFO SparkEnv: Registering BlockManagerMaster 15/06/13 11:06:10 INFO DiskBlockManager: Created local directory at /private/var/folders/7_/k5h82ws97b95v5f5h8wf9j0hgn/T/spark-f36f39f5-7f82-42e0-b3e0-9eb1e1cc0816/blockmgr-a1412b71-fe56-429c-a193-ce3fb95d2ffd 15/06/13
[jira] [Created] (SPARK-8366) When task fails and append a new one, the ExecutorAllocationManager can't sense the new tasks
meiyoula created SPARK-8366: --- Summary: When task fails and append a new one, the ExecutorAllocationManager can't sense the new tasks Key: SPARK-8366 URL: https://issues.apache.org/jira/browse/SPARK-8366 Project: Spark Issue Type: Bug Components: Spark Core Reporter: meiyoula I use the *dynamic executor allocation* function. Then one executor is killed, all the tasks on it are failed. When the new tasks are appended, the new executor won't added. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6259) Python API for LDA
[ https://issues.apache.org/jira/browse/SPARK-6259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14585296#comment-14585296 ] Yu Ishikawa commented on SPARK-6259: I have tried to implement it like above. https://github.com/yu-iskw/spark/commit/85fdf2d9ca98813c843f51c1874d1b8adfa7b242 Python API for LDA -- Key: SPARK-6259 URL: https://issues.apache.org/jira/browse/SPARK-6259 Project: Spark Issue Type: Improvement Components: MLlib, PySpark Affects Versions: 1.3.0 Reporter: Joseph K. Bradley Add Python API for LDA. This task may be blocked by ongoing work on LDA which may require API changes: * [SPARK-5563] * [SPARK-5556] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8367) ReliableKafka will loss data when `spark.streaming.blockInterval` was 0
[ https://issues.apache.org/jira/browse/SPARK-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14585389#comment-14585389 ] Apache Spark commented on SPARK-8367: - User 'SaintBacchus' has created a pull request for this issue: https://github.com/apache/spark/pull/6818 ReliableKafka will loss data when `spark.streaming.blockInterval` was 0 --- Key: SPARK-8367 URL: https://issues.apache.org/jira/browse/SPARK-8367 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: SaintBacchus {code:title=BlockGenerator.scala|borderStyle=solid} /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size 0) { val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug(Last element in + blockId + is + newBlockBuffer.last) } } catch { case ie: InterruptedException = logInfo(Block updating timer thread was interrupted) case e: Exception = reportError(Error in block updating thread, e) } } {code} If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always be the same because of *time* was 0 and *blockIntervalMs* was 0 too. {code:title=ReliableKafkaReceiver.scala|borderStyle=solid} private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) topicPartitionOffsetMap.clear() } {code} If the *blockId* was the same, Streaming will commit the *offset* before the really data comsumed(data was waitting to be commit but the offset had updated and commit by previous commit) So when exception occures, the *offset* had commit but the data will loss since the data was in memory and not comsumed yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-8366) When task fails and append a new one, the ExecutorAllocationManager can't sense the new tasks
[ https://issues.apache.org/jira/browse/SPARK-8366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-8366: --- Assignee: Apache Spark When task fails and append a new one, the ExecutorAllocationManager can't sense the new tasks - Key: SPARK-8366 URL: https://issues.apache.org/jira/browse/SPARK-8366 Project: Spark Issue Type: Bug Components: Spark Core Reporter: meiyoula Assignee: Apache Spark I use the *dynamic executor allocation* function. Then one executor is killed, all the tasks on it are failed. When the new tasks are appended, the new executor won't added. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-8366) When task fails and append a new one, the ExecutorAllocationManager can't sense the new tasks
[ https://issues.apache.org/jira/browse/SPARK-8366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-8366: --- Assignee: (was: Apache Spark) When task fails and append a new one, the ExecutorAllocationManager can't sense the new tasks - Key: SPARK-8366 URL: https://issues.apache.org/jira/browse/SPARK-8366 Project: Spark Issue Type: Bug Components: Spark Core Reporter: meiyoula I use the *dynamic executor allocation* function. Then one executor is killed, all the tasks on it are failed. When the new tasks are appended, the new executor won't added. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8366) When task fails and append a new one, the ExecutorAllocationManager can't sense the new tasks
[ https://issues.apache.org/jira/browse/SPARK-8366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14585371#comment-14585371 ] Apache Spark commented on SPARK-8366: - User 'XuTingjun' has created a pull request for this issue: https://github.com/apache/spark/pull/6817 When task fails and append a new one, the ExecutorAllocationManager can't sense the new tasks - Key: SPARK-8366 URL: https://issues.apache.org/jira/browse/SPARK-8366 Project: Spark Issue Type: Bug Components: Spark Core Reporter: meiyoula I use the *dynamic executor allocation* function. Then one executor is killed, all the tasks on it are failed. When the new tasks are appended, the new executor won't added. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8367) ReliableKafka will loss data when `spark.streaming.blockInterval` was 0
[ https://issues.apache.org/jira/browse/SPARK-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] SaintBacchus updated SPARK-8367: Description: {code:title=BlockGenerator.scala|borderStyle=solid} /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size 0) { val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug(Last element in + blockId + is + newBlockBuffer.last) } } catch { case ie: InterruptedException = logInfo(Block updating timer thread was interrupted) case e: Exception = reportError(Error in block updating thread, e) } } {code} If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always be the same because of *time* was 0 and *blockIntervalMs* was 0 too. {code:title=ReliableKafkaReceiver.scala|borderStyle=solid} private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) topicPartitionOffsetMap.clear() } {code} If the *blockId* was the same, Streaming will put current data into later *offset*. So when exception occures, the *offset* had commit but the data will loss was: {code:title=BlockGenerator.scala|borderStyle=solid} /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size 0) { val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug(Last element in + blockId + is + newBlockBuffer.last) } } catch { case ie: InterruptedException = logInfo(Block updating timer thread was interrupted) case e: Exception = reportError(Error in block updating thread, e) } } {code} If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always be the same because of *time* was 0 and *blockIntervalMs* was 0 too. {code:title=ReliableKafkaReceiver.scala|borderStyle=solid} private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) topicPartitionOffsetMap.clear() } {code} If the *blockId* was the same, Streaming will put current data into previous *offset* ReliableKafka will loss data when `spark.streaming.blockInterval` was 0 --- Key: SPARK-8367 URL: https://issues.apache.org/jira/browse/SPARK-8367 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: SaintBacchus {code:title=BlockGenerator.scala|borderStyle=solid} /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size 0) { val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug(Last element in + blockId + is + newBlockBuffer.last) } } catch { case ie: InterruptedException = logInfo(Block updating timer thread was interrupted) case e: Exception = reportError(Error in block updating thread, e) } } {code} If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always be the same because of *time* was 0 and *blockIntervalMs* was 0 too. {code:title=ReliableKafkaReceiver.scala|borderStyle=solid} private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) topicPartitionOffsetMap.clear() }
[jira] [Updated] (SPARK-8367) ReliableKafka will loss data when `spark.streaming.blockInterval` was 0
[ https://issues.apache.org/jira/browse/SPARK-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] SaintBacchus updated SPARK-8367: Description: {code:title=BlockGenerator.scala|borderStyle=solid} /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size 0) { val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug(Last element in + blockId + is + newBlockBuffer.last) } } catch { case ie: InterruptedException = logInfo(Block updating timer thread was interrupted) case e: Exception = reportError(Error in block updating thread, e) } } {code} If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always be the same because of *time* was 0 and *blockIntervalMs* was 0 too. {code:title=ReliableKafkaReceiver.scala|borderStyle=solid} private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) topicPartitionOffsetMap.clear() } {code} If the *blockId* was the same, Streaming will commit the *offset* before the really data comsumed. So when exception occures, the *offset* had commit but the data will loss since the data was in memory and not comsumed yet. was: {code:title=BlockGenerator.scala|borderStyle=solid} /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size 0) { val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug(Last element in + blockId + is + newBlockBuffer.last) } } catch { case ie: InterruptedException = logInfo(Block updating timer thread was interrupted) case e: Exception = reportError(Error in block updating thread, e) } } {code} If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always be the same because of *time* was 0 and *blockIntervalMs* was 0 too. {code:title=ReliableKafkaReceiver.scala|borderStyle=solid} private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) topicPartitionOffsetMap.clear() } {code} If the *blockId* was the same, Streaming will put current data into later *offset*. So when exception occures, the *offset* had commit but the data will loss since the data was in memory and not comsumed yet. ReliableKafka will loss data when `spark.streaming.blockInterval` was 0 --- Key: SPARK-8367 URL: https://issues.apache.org/jira/browse/SPARK-8367 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: SaintBacchus {code:title=BlockGenerator.scala|borderStyle=solid} /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size 0) { val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug(Last element in + blockId + is + newBlockBuffer.last) } } catch { case ie: InterruptedException = logInfo(Block updating timer thread was interrupted) case e: Exception = reportError(Error in block updating thread, e) } } {code} If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always be the same because of *time* was 0 and *blockIntervalMs* was 0 too. {code:title=ReliableKafkaReceiver.scala|borderStyle=solid} private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current
[jira] [Updated] (SPARK-8367) ReliableKafka will loss data when `spark.streaming.blockInterval` was 0
[ https://issues.apache.org/jira/browse/SPARK-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] SaintBacchus updated SPARK-8367: Description: {code:title=BlockGenerator.scala|borderStyle=solid} /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size 0) { val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug(Last element in + blockId + is + newBlockBuffer.last) } } catch { case ie: InterruptedException = logInfo(Block updating timer thread was interrupted) case e: Exception = reportError(Error in block updating thread, e) } } {code} If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always be the same because of *time* was 0 and *blockIntervalMs* was 0 too. {code:title=ReliableKafkaReceiver.scala|borderStyle=solid} private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) topicPartitionOffsetMap.clear() } {code} If the *blockId* was the same, Streaming will put current data into later *offset*. So when exception occures, the *offset* had commit but the data will loss since the data was in memory and not comsumed yet. was: {code:title=BlockGenerator.scala|borderStyle=solid} /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size 0) { val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug(Last element in + blockId + is + newBlockBuffer.last) } } catch { case ie: InterruptedException = logInfo(Block updating timer thread was interrupted) case e: Exception = reportError(Error in block updating thread, e) } } {code} If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always be the same because of *time* was 0 and *blockIntervalMs* was 0 too. {code:title=ReliableKafkaReceiver.scala|borderStyle=solid} private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) topicPartitionOffsetMap.clear() } {code} If the *blockId* was the same, Streaming will put current data into later *offset*. So when exception occures, the *offset* had commit but the data will loss ReliableKafka will loss data when `spark.streaming.blockInterval` was 0 --- Key: SPARK-8367 URL: https://issues.apache.org/jira/browse/SPARK-8367 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: SaintBacchus {code:title=BlockGenerator.scala|borderStyle=solid} /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size 0) { val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug(Last element in + blockId + is + newBlockBuffer.last) } } catch { case ie: InterruptedException = logInfo(Block updating timer thread was interrupted) case e: Exception = reportError(Error in block updating thread, e) } } {code} If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always be the same because of *time* was 0 and *blockIntervalMs* was 0 too. {code:title=ReliableKafkaReceiver.scala|borderStyle=solid} private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. val offsetSnapshot
[jira] [Created] (SPARK-8369) Support dependency jar and files on HDFS in standalone cluster mode
Dong Lei created SPARK-8369: --- Summary: Support dependency jar and files on HDFS in standalone cluster mode Key: SPARK-8369 URL: https://issues.apache.org/jira/browse/SPARK-8369 Project: Spark Issue Type: New Feature Components: Spark Core Reporter: Dong Lei Currently, in standalone cluster mode, spark can take care of the app-jar whether the app-jar is specified by file:// or hdfs://. But the dependencies specified by --jars and --files do not support a hdfs:// prefix. For example: spark-submit ... --jars hdfs://path1/1.jar hdfs://path2/2.jar --files hdfs://path3/3.file hdfs://path4/4.file hdfs://path5/app.jar only app.jar will be downloaded to the driver and distributed to executors, others (1.jar, 2.jar. 3.file, 4.file) will not. I think such a feature is useful for users. To support such a feature, I think we can treat the jars and files like the app jar in DriverRunner. We download them and replace the remote addresses with local addresses. And the DriverWrapper will not be aware. The problem is it's not easy to replace these addresses than replace the location app jar, because we have a placeholder for app jar USER_JAR. We may need to do some string matching to achieve it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-8368) ClassNotFoundException in closure for map
CHEN Zhiwei created SPARK-8368: -- Summary: ClassNotFoundException in closure for map Key: SPARK-8368 URL: https://issues.apache.org/jira/browse/SPARK-8368 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.4.0 Environment: Centos 6.5, java 1.7.0_67, scala 2.10.4. Build the project on Windows 7 and run in a spark standalone cluster(or local) mode on Centos 6.X. Reporter: CHEN Zhiwei After upgraded the cluster from spark 1.3.0 to 1.4.0(rc4), I encountered the following exception: ==begin exception Exception in thread main java.lang.ClassNotFoundException: com.yhd.ycache.magic.Model$$anonfun$9$$anonfun$10 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:278) at org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(ClosureCleaner.scala:455) at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown Source) at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown Source) at org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:101) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:197) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1891) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.map(RDD.scala:293) at org.apache.spark.sql.DataFrame.map(DataFrame.scala:1210) at com.yhd.ycache.magic.Model$.main(SSExample.scala:239) at com.yhd.ycache.magic.Model.main(SSExample.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ===end exception=== I simplify the code that cause this issue, as following: ==begin code== object Model extends Serializable{ def main(args: Array[String]) { val Array(sql/*,usePartition,numClasses_, numTrees_,impurity_, maxDepth_, maxBins_*/) = args val sparkConf = new SparkConf().setAppName(Mode Example) val sc = new SparkContext(sparkConf) val hive = new HiveContext(sc) //get data by hive sql val rows = hive.sql(sql) val data = rows.map(r = { val arr = r.toSeq.toArray val label = 1.0 //string2Int(arr(r.length-1).toString).toDouble def fmap = (input: Any) = 1.0 //string2Int(input.toString).toDouble val feature = arr.map(_=1.0) LabeledPoint(label, Vectors.dense(feature)) }) data.count() } } =end code=== This code can run pretty well on spark-shell, but error when submit it to spark cluster (standalone or local mode). I try the same code on spark 1.3.0(local mode), and no exception is encountered. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8367) ReliableKafka will loss data when `spark.streaming.blockInterval` was 0
[ https://issues.apache.org/jira/browse/SPARK-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] SaintBacchus updated SPARK-8367: Description: {code:title=BlockGenerator.scala|borderStyle=solid} /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size 0) { val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug(Last element in + blockId + is + newBlockBuffer.last) } } catch { case ie: InterruptedException = logInfo(Block updating timer thread was interrupted) case e: Exception = reportError(Error in block updating thread, e) } } {code} If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always be the same because of *time* was 0 and *blockIntervalMs* was 0 too. {code:title=ReliableKafkaReceiver.scala|borderStyle=solid} private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) topicPartitionOffsetMap.clear() } {code} If the *blockId* was the same, Streaming will commit the *offset* before the really data comsumed(data was waitting to be commit but the offset had updated and commit by previous commit) So when exception occures, the *offset* had commit but the data will loss since the data was in memory and not comsumed yet. was: {code:title=BlockGenerator.scala|borderStyle=solid} /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size 0) { val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug(Last element in + blockId + is + newBlockBuffer.last) } } catch { case ie: InterruptedException = logInfo(Block updating timer thread was interrupted) case e: Exception = reportError(Error in block updating thread, e) } } {code} If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always be the same because of *time* was 0 and *blockIntervalMs* was 0 too. {code:title=ReliableKafkaReceiver.scala|borderStyle=solid} private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) topicPartitionOffsetMap.clear() } {code} If the *blockId* was the same, Streaming will commit the *offset* before the really data comsumed. So when exception occures, the *offset* had commit but the data will loss since the data was in memory and not comsumed yet. ReliableKafka will loss data when `spark.streaming.blockInterval` was 0 --- Key: SPARK-8367 URL: https://issues.apache.org/jira/browse/SPARK-8367 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: SaintBacchus {code:title=BlockGenerator.scala|borderStyle=solid} /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size 0) { val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug(Last element in + blockId + is + newBlockBuffer.last) } } catch { case ie: InterruptedException = logInfo(Block updating timer thread was interrupted) case e: Exception = reportError(Error in block updating thread, e) } } {code} If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always be the same because of *time* was 0 and *blockIntervalMs* was 0 too.
[jira] [Assigned] (SPARK-8367) ReliableKafka will loss data when `spark.streaming.blockInterval` was 0
[ https://issues.apache.org/jira/browse/SPARK-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-8367: --- Assignee: (was: Apache Spark) ReliableKafka will loss data when `spark.streaming.blockInterval` was 0 --- Key: SPARK-8367 URL: https://issues.apache.org/jira/browse/SPARK-8367 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: SaintBacchus {code:title=BlockGenerator.scala|borderStyle=solid} /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size 0) { val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug(Last element in + blockId + is + newBlockBuffer.last) } } catch { case ie: InterruptedException = logInfo(Block updating timer thread was interrupted) case e: Exception = reportError(Error in block updating thread, e) } } {code} If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always be the same because of *time* was 0 and *blockIntervalMs* was 0 too. {code:title=ReliableKafkaReceiver.scala|borderStyle=solid} private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) topicPartitionOffsetMap.clear() } {code} If the *blockId* was the same, Streaming will commit the *offset* before the really data comsumed(data was waitting to be commit but the offset had updated and commit by previous commit) So when exception occures, the *offset* had commit but the data will loss since the data was in memory and not comsumed yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-8367) ReliableKafka will loss data when `spark.streaming.blockInterval` was 0
[ https://issues.apache.org/jira/browse/SPARK-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-8367: --- Assignee: Apache Spark ReliableKafka will loss data when `spark.streaming.blockInterval` was 0 --- Key: SPARK-8367 URL: https://issues.apache.org/jira/browse/SPARK-8367 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: SaintBacchus Assignee: Apache Spark {code:title=BlockGenerator.scala|borderStyle=solid} /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size 0) { val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug(Last element in + blockId + is + newBlockBuffer.last) } } catch { case ie: InterruptedException = logInfo(Block updating timer thread was interrupted) case e: Exception = reportError(Error in block updating thread, e) } } {code} If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always be the same because of *time* was 0 and *blockIntervalMs* was 0 too. {code:title=ReliableKafkaReceiver.scala|borderStyle=solid} private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) topicPartitionOffsetMap.clear() } {code} If the *blockId* was the same, Streaming will commit the *offset* before the really data comsumed(data was waitting to be commit but the offset had updated and commit by previous commit) So when exception occures, the *offset* had commit but the data will loss since the data was in memory and not comsumed yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8368) ClassNotFoundException in closure for map
[ https://issues.apache.org/jira/browse/SPARK-8368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] CHEN Zhiwei updated SPARK-8368: --- Description: After upgraded the cluster from spark 1.3.0 to 1.4.0(rc4), I encountered the following exception: ==begin exception Exception in thread main java.lang.ClassNotFoundException: com.yhd.ycache.magic.Model$$anonfun$9$$anonfun$10 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:278) at org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(ClosureCleaner.scala:455) at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown Source) at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown Source) at org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:101) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:197) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1891) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.map(RDD.scala:293) at org.apache.spark.sql.DataFrame.map(DataFrame.scala:1210) at com.yhd.ycache.magic.Model$.main(SSExample.scala:239) at com.yhd.ycache.magic.Model.main(SSExample.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ===end exception=== I simplify the code that cause this issue, as following: ==begin code== ``` object Model extends Serializable{ def main(args: Array[String]) { val Array(sql/*,usePartition,numClasses_, numTrees_,impurity_, maxDepth_, maxBins_*/) = args val sparkConf = new SparkConf().setAppName(Mode Example) val sc = new SparkContext(sparkConf) val hive = new HiveContext(sc) //get data by hive sql val rows = hive.sql(sql) val data = rows.map(r = { val arr = r.toSeq.toArray val label = 1.0 //string2Int(arr(r.length-1).toString).toDouble def fmap = (input: Any) = 1.0 //string2Int(input.toString).toDouble val feature = arr.map(_=1.0) LabeledPoint(label, Vectors.dense(feature)) }) data.count() } } ``` =end code=== This code can run pretty well on spark-shell, but error when submit it to spark cluster (standalone or local mode). I try the same code on spark 1.3.0(local mode), and no exception is encountered. was: After upgraded the cluster from spark 1.3.0 to 1.4.0(rc4), I encountered the following exception: ==begin exception Exception in thread main java.lang.ClassNotFoundException: com.yhd.ycache.magic.Model$$anonfun$9$$anonfun$10 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:278) at
[jira] [Updated] (SPARK-8368) ClassNotFoundException in closure for map
[ https://issues.apache.org/jira/browse/SPARK-8368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] CHEN Zhiwei updated SPARK-8368: --- Description: After upgraded the cluster from spark 1.3.0 to 1.4.0(rc4), I encountered the following exception: ==begin exception {quote} Exception in thread main java.lang.ClassNotFoundException: com.yhd.ycache.magic.Model$$anonfun$9$$anonfun$10 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:278) at org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(ClosureCleaner.scala:455) at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown Source) at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown Source) at org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:101) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:197) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1891) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.map(RDD.scala:293) at org.apache.spark.sql.DataFrame.map(DataFrame.scala:1210) at com.yhd.ycache.magic.Model$.main(SSExample.scala:239) at com.yhd.ycache.magic.Model.main(SSExample.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) {quote} ===end exception=== I simplify the code that cause this issue, as following: ==begin code== {quote} object Model extends Serializable{ def main(args: Array[String]) { val Array(sql) = args val sparkConf = new SparkConf().setAppName(Mode Example) val sc = new SparkContext(sparkConf) val hive = new HiveContext(sc) //get data by hive sql val rows = hive.sql(sql) val data = rows.map(r = { val arr = r.toSeq.toArray val label = 1.0 def fmap = (input: Any) = 1.0 val feature = arr.map(_=1.0) LabeledPoint(label, Vectors.dense(feature)) }) data.count() } } {quote} =end code=== This code can run pretty well on spark-shell, but error when submit it to spark cluster (standalone or local mode). I try the same code on spark 1.3.0(local mode), and no exception is encountered. was: After upgraded the cluster from spark 1.3.0 to 1.4.0(rc4), I encountered the following exception: ==begin exception Exception in thread main java.lang.ClassNotFoundException: com.yhd.ycache.magic.Model$$anonfun$9$$anonfun$10 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:278) at org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(ClosureCleaner.scala:455) at
[jira] [Commented] (SPARK-8283) udf_struct test failure
[ https://issues.apache.org/jira/browse/SPARK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14585398#comment-14585398 ] Yijie Shen commented on SPARK-8283: --- I'll take this udf_struct test failure --- Key: SPARK-8283 URL: https://issues.apache.org/jira/browse/SPARK-8283 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin Priority: Blocker {code} [info] - udf_struct *** FAILED *** (704 milliseconds) [info] Failed to execute query using catalyst: [info] Error: org.apache.spark.sql.catalyst.expressions.Literal cannot be cast to org.apache.spark.sql.catalyst.expressions.NamedExpression [info] java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.Literal cannot be cast to org.apache.spark.sql.catalyst.expressions.NamedExpression [info]at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$1.apply(complexTypes.scala:64) [info]at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) [info]at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) [info]at scala.collection.immutable.List.foreach(List.scala:318) [info]at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) [info]at scala.collection.AbstractTraversable.map(Traversable.scala:105) [info]at org.apache.spark.sql.catalyst.expressions.CreateStruct.dataType$lzycompute(complexTypes.scala:64) [info]at org.apache.spark.sql.catalyst.expressions.CreateStruct.dataType(complexTypes.scala:61) [info]at org.apache.spark.sql.catalyst.expressions.CreateStruct.dataType(complexTypes.scala:55) [info]at org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(ExtractValue.scala:43) [info]at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$8$$anonfun$applyOrElse$4.applyOrElse(Analyzer.scala:353) [info]at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$8$$anonfun$applyOrElse$4.applyOrElse(Analyzer.scala:340) [info]at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286) [info]at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286) [info]at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) [info]at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:285) [info]at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:299) [info]at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) [info]at scala.collection.Iterator$class.foreach(Iterator.scala:727) [info]at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) [info]at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) [info]at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) [info]at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) [info]at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) [info]at scala.collection.AbstractIterator.to(Iterator.scala:1157) [info]at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org