spark git commit: [SPARK-9858][SPARK-9859][SPARK-9861][SQL] Add an ExchangeCoordinator to estimate the number of post-shuffle partitions for aggregates and joins
Repository: spark Updated Branches: refs/heads/master c34c27fe9 -> d728d5c98 [SPARK-9858][SPARK-9859][SPARK-9861][SQL] Add an ExchangeCoordinator to estimate the number of post-shuffle partitions for aggregates and joins https://issues.apache.org/jira/browse/SPARK-9858 https://issues.apache.org/jira/browse/SPARK-9859 https://issues.apache.org/jira/browse/SPARK-9861 Author: Yin Huai Closes #9276 from yhuai/numReducer. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d728d5c9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d728d5c9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d728d5c9 Branch: refs/heads/master Commit: d728d5c98658c44ed2949b55d36edeaa46f8c980 Parents: c34c27f Author: Yin Huai Authored: Tue Nov 3 00:12:49 2015 -0800 Committer: Yin Huai Committed: Tue Nov 3 00:12:49 2015 -0800 -- .../catalyst/plans/physical/partitioning.scala | 8 + .../scala/org/apache/spark/sql/SQLConf.scala| 27 ++ .../apache/spark/sql/execution/Exchange.scala | 217 - .../sql/execution/ExchangeCoordinator.scala | 260 ++ .../spark/sql/execution/ShuffledRowRDD.scala| 134 +- .../execution/ExchangeCoordinatorSuite.scala| 479 +++ .../spark/sql/execution/PlannerSuite.scala | 8 +- .../execution/UnsafeRowSerializerSuite.scala| 7 +- .../sql/execution/joins/InnerJoinSuite.scala| 19 +- 9 files changed, 1115 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d728d5c9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 86b9417..9312c81 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -165,6 +165,11 @@ sealed trait Partitioning { * produced by `A` could have also been produced by `B`. */ def guarantees(other: Partitioning): Boolean = this == other + + def withNumPartitions(newNumPartitions: Int): Partitioning = { +throw new IllegalStateException( + s"It is not allowed to call withNumPartitions method of a ${this.getClass.getSimpleName}") + } } object Partitioning { @@ -249,6 +254,9 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) case _ => false } + override def withNumPartitions(newNumPartitions: Int): HashPartitioning = { +HashPartitioning(expressions, newNumPartitions) + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/d728d5c9/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 6f28920..ed8b634 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -233,6 +233,25 @@ private[spark] object SQLConf { defaultValue = Some(200), doc = "The default number of partitions to use when shuffling data for joins or aggregations.") + val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE = +longConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", + defaultValue = Some(64 * 1024 * 1024), + doc = "The target post-shuffle input size in bytes of a task.") + + val ADAPTIVE_EXECUTION_ENABLED = booleanConf("spark.sql.adaptive.enabled", +defaultValue = Some(false), +doc = "When true, enable adaptive query execution.") + + val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS = +intConf("spark.sql.adaptive.minNumPostShufflePartitions", + defaultValue = Some(-1), + doc = "The advisory minimal number of post-shuffle partitions provided to " + +"ExchangeCoordinator. This setting is used in our test to make sure we " + +"have enough parallelism to expose issues that will not be exposed with a " + +"single partition. When the value is a non-positive value, this setting will" + +"not be provided to ExchangeCoordinator.", + isPublic = false) + val TUNGSTEN_ENABLED = booleanConf("spark.sql.tungsten.enabled", defaultValue = Some(true), doc = "When true, use the optimized Tungsten physical execution backend which explicitly " + @@ -487,6 +506,14 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def numShufflePartitions: Int = getC
spark git commit: [SPARK-10429] [SQL] make mutableProjection atomic
Repository: spark Updated Branches: refs/heads/master d728d5c98 -> 67e23b39a [SPARK-10429] [SQL] make mutableProjection atomic Right now, SQL's mutable projection updates every value of the mutable project after it evaluates the corresponding expression. This makes the behavior of MutableProjection confusing and complicate the implementation of common aggregate functions like stddev because developers need to be aware that when evaluating {{i+1}}th expression of a mutable projection, {{i}}th slot of the mutable row has already been updated. This PR make the MutableProjection atomic, by generating all the results of expressions first, then copy them into mutableRow. Had run a mircro-benchmark, there is no notable performance difference between using class members and local variables. cc yhuai Author: Davies Liu Closes #9422 from davies/atomic_mutable and squashes the following commits: bbc1758 [Davies Liu] support wide table 8a0ae14 [Davies Liu] fix bug bec07da [Davies Liu] refactor 2891628 [Davies Liu] make mutableProjection atomic Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67e23b39 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67e23b39 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67e23b39 Branch: refs/heads/master Commit: 67e23b39ac3cdee06668fa9131951278b9731e29 Parents: d728d5c Author: Davies Liu Authored: Tue Nov 3 11:42:08 2015 +0100 Committer: Michael Armbrust Committed: Tue Nov 3 11:42:08 2015 +0100 -- .../sql/catalyst/expressions/Projection.scala | 13 +- .../expressions/aggregate/functions.scala | 154 --- .../codegen/GenerateMutableProjection.scala | 28 +++- 3 files changed, 97 insertions(+), 98 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/67e23b39/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index afe52e6..a6fe730 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, GenerateUnsafeProjection} -import org.apache.spark.sql.types.{DataType, Decimal, StructType, _} -import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} +import org.apache.spark.sql.types.{DataType, StructType} /** * A [[Projection]] that is calculated by calling the `eval` of each of the specified expressions. @@ -62,6 +61,8 @@ case class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mu def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = this(expressions.map(BindReferences.bindReference(_, inputSchema))) + private[this] val buffer = new Array[Any](expressions.size) + expressions.foreach(_.foreach { case n: Nondeterministic => n.setInitialValues() case _ => @@ -79,7 +80,13 @@ case class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mu override def apply(input: InternalRow): InternalRow = { var i = 0 while (i < exprArray.length) { - mutableRow(i) = exprArray(i).eval(input) + // Store the result into buffer first, to make the projection atomic (needed by aggregation) + buffer(i) = exprArray(i).eval(input) + i += 1 +} +i = 0 +while (i < exprArray.length) { + mutableRow(i) = buffer(i) i += 1 } mutableRow http://git-wip-us.apache.org/repos/asf/spark/blob/67e23b39/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala index 5d2eb7b..f2c3eca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala @@ -57,37 +57,37 @@ case class Average(child: Expression) extends DeclarativeAggregate { case _ => DoubleType } - private val currentSum = AttributeReference("currentSum", sumDataType)() - private val currentCount = AttributeReference("currentCount",
spark git commit: [SPARK-11188] [SQL] Elide stacktraces in bin/spark-sql for AnalysisExceptions
Repository: spark Updated Branches: refs/heads/branch-1.5 b85bf8f49 -> 5604ce9c1 [SPARK-11188] [SQL] Elide stacktraces in bin/spark-sql for AnalysisExceptions Only print the error message to the console for Analysis Exceptions in sql-shell Author: Dilip Biswal Closes #9374 from dilipbiswal/dkb-11188-v152 and squashes the following commits: a58cedc [Dilip Biswal] [SPARK-11188][SQL] Elide stacktraces in bin/spark-sql for AnalysisExceptions Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5604ce9c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5604ce9c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5604ce9c Branch: refs/heads/branch-1.5 Commit: 5604ce9c1b9bfb3b8ad046915eeea787289d281e Parents: b85bf8f Author: Dilip Biswal Authored: Tue Nov 3 12:14:01 2015 +0100 Committer: Michael Armbrust Committed: Tue Nov 3 12:14:01 2015 +0100 -- .../spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala | 10 +- .../spark/sql/hive/thriftserver/SparkSQLDriver.scala| 10 +++--- .../apache/spark/sql/hive/thriftserver/CliSuite.scala | 12 ++-- 3 files changed, 26 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5604ce9c/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index e58f8ca..212bd2c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -22,6 +22,8 @@ import scala.collection.JavaConversions._ import java.io._ import java.util.{ArrayList => JArrayList, Locale} +import org.apache.spark.sql.AnalysisException + import jline.console.ConsoleReader import jline.console.history.FileHistory @@ -298,6 +300,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { driver.init() val out = sessionState.out + val err = sessionState.err val start: Long = System.currentTimeMillis() if (sessionState.getIsVerbose) { out.println(cmd) @@ -308,7 +311,12 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { ret = rc.getResponseCode if (ret != 0) { -console.printError(rc.getErrorMessage()) +// For analysis exception, only the error is printed out to the console. +rc.getException() match { + case e : AnalysisException => +err.println(s"""Error in query: ${e.getMessage}""") + case _ => err.println(rc.getErrorMessage()) +} driver.close() return ret } http://git-wip-us.apache.org/repos/asf/spark/blob/5604ce9c/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 77272ae..e44fa5e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.thriftserver import java.util.{ArrayList => JArrayList, List => JList} +import org.apache.spark.sql.AnalysisException import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema} @@ -63,9 +64,12 @@ private[hive] class SparkSQLDriver( tableSchema = getResultSetSchema(execution) new CommandProcessorResponse(0) } catch { - case cause: Throwable => -logError(s"Failed in [$command]", cause) -new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(cause), null) +case ae: AnalysisException => + logDebug(s"Failed in [$command]", ae) + new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(ae), null, ae) +case cause: Throwable => + logError(s"Failed in [$command]", cause) + new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(cause), null, cause) } } http://git-wip-us.apache.org/repos/asf/spark/blob/5604ce9c/sql/hive-thriftserver/src/test/scala
spark git commit: [SPARK-11436] [SQL] rebind right encoder when join 2 datasets
Repository: spark Updated Branches: refs/heads/master 67e23b39a -> 425ff03f5 [SPARK-11436] [SQL] rebind right encoder when join 2 datasets When we join 2 datasets, we will combine 2 encoders into a tupled one, and use it as the encoder for the jioned dataset. Assume both of the 2 encoders are flat, their `constructExpression`s both reference to the first element of input row. However, when we combine 2 encoders, the schema of input row changed, now the right encoder should reference to second element of input row. So we should rebind right encoder to let it know the new schema of input row before combine it. Author: Wenchen Fan Closes #9391 from cloud-fan/join and squashes the following commits: 846d3ab [Wenchen Fan] rebind right encoder when join 2 datasets Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/425ff03f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/425ff03f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/425ff03f Branch: refs/heads/master Commit: 425ff03f5ac4f3ddda1ba06656e620d5426f4209 Parents: 67e23b3 Author: Wenchen Fan Authored: Tue Nov 3 12:47:39 2015 +0100 Committer: Michael Armbrust Committed: Tue Nov 3 12:47:39 2015 +0100 -- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 +++- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 8 2 files changed, 11 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/425ff03f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index e0ab5f5..ed98a25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -390,7 +390,9 @@ class Dataset[T] private( val rightEncoder = if (other.encoder.flat) other.encoder else other.encoder.nested(rightData.toAttribute) implicit val tuple2Encoder: Encoder[(T, U)] = - ExpressionEncoder.tuple(leftEncoder, rightEncoder) + ExpressionEncoder.tuple( +leftEncoder, +rightEncoder.rebind(right.output, left.output ++ right.output)) withPlan[(T, U)](other) { (left, right) => Project( http://git-wip-us.apache.org/repos/asf/spark/blob/425ff03f/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 993e6d2..95b8d05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -214,4 +214,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext { cogrouped, 1 -> "a#", 2 -> "#q", 3 -> "abcfoo#w", 5 -> "hello#er") } + + test("SPARK-11436: we should rebind right encoder when join 2 datasets") { +val ds1 = Seq("1", "2").toDS().as("a") +val ds2 = Seq(2, 3).toDS().as("b") + +val joined = ds1.joinWith(ds2, $"a.value" === $"b.value") +checkAnswer(joined, ("2", 2)) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11404] [SQL] Support for groupBy using column expressions
Repository: spark Updated Branches: refs/heads/master 425ff03f5 -> b86f2cab6 [SPARK-11404] [SQL] Support for groupBy using column expressions This PR adds a new method `groupBy(cols: Column*)` to `Dataset` that allows users to group using column expressions instead of a lambda function. Since the return type of these expressions is not known at compile time, we just set the key type as a generic `Row`. If the user would like to work the key in a type-safe way, they can call `grouped.asKey[Type]`, which is also added in this PR. ```scala val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() val grouped = ds.groupBy($"_1").asKey[String] val agged = grouped.mapGroups { case (g, iter) => Iterator((g, iter.map(_._2).sum)) } agged.collect() res0: Array(("a", 30), ("b", 3), ("c", 1)) ``` Author: Michael Armbrust Closes #9359 from marmbrus/columnGroupBy and squashes the following commits: bbcb03b [Michael Armbrust] Update DatasetSuite.scala 8fd2908 [Michael Armbrust] Update DatasetSuite.scala 0b0e2f8 [Michael Armbrust] [SPARK-11404] [SQL] Support for groupBy using column expressions Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b86f2cab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b86f2cab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b86f2cab Branch: refs/heads/master Commit: b86f2cab67989f09ba1ba8604e52cd4b1e44e436 Parents: 425ff03 Author: Michael Armbrust Authored: Tue Nov 3 13:02:17 2015 +0100 Committer: Michael Armbrust Committed: Tue Nov 3 13:02:17 2015 +0100 -- .../scala/org/apache/spark/sql/Dataset.scala| 36 +-- .../org/apache/spark/sql/GroupedDataset.scala | 28 ++-- .../org/apache/spark/sql/DatasetSuite.scala | 48 3 files changed, 106 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b86f2cab/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index ed98a25..7b75aee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.Inner @@ -78,9 +79,17 @@ class Dataset[T] private( * * */ /** - * Returns a new `Dataset` where each record has been mapped on to the specified type. - * TODO: should bind here... - * TODO: document binding rules + * Returns a new `Dataset` where each record has been mapped on to the specified type. The + * method used to map columns depend on the type of `U`: + * - When `U` is a class, fields for the class will be mapped to columns of the same name + *(case sensitivity is determined by `spark.sql.caseSensitive`) + * - When `U` is a tuple, the columns will be be mapped by ordinal (i.e. the first column will + *be assigned to `_1`). + * - When `U` is a primitive type (i.e. String, Int, etc). then the first column of the + *[[DataFrame]] will be used. + * + * If the schema of the [[DataFrame]] does not match the desired `U` type, you can use `select` + * along with `alias` or `as` to rearrange or rename as required. * @since 1.6.0 */ def as[U : Encoder]: Dataset[U] = { @@ -225,6 +234,27 @@ class Dataset[T] private( withGroupingKey.newColumns) } + /** + * Returns a [[GroupedDataset]] where the data is grouped by the given [[Column]] expressions. + * @since 1.6.0 + */ + @scala.annotation.varargs + def groupBy(cols: Column*): GroupedDataset[Row, T] = { +val withKeyColumns = logicalPlan.output ++ cols.map(_.expr).map(UnresolvedAlias) +val withKey = Project(withKeyColumns, logicalPlan) +val executed = sqlContext.executePlan(withKey) + +val dataAttributes = executed.analyzed.output.dropRight(cols.size) +val keyAttributes = executed.analyzed.output.takeRight(cols.size) + +new GroupedDataset( + RowEncoder(keyAttributes.toStructType), + encoderFor[T], + executed, + dataAttributes, + keyAttributes) + } + /* ** * * Typed Relational * * ** */ http://git-wip-us.apache.org/repos/asf/spark/blob/b86f2cab/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala --
spark git commit: [SPARK-11344] Made ApplicationDescription and DriverDescription case classes
Repository: spark Updated Branches: refs/heads/master b86f2cab6 -> 233e534ac [SPARK-11344] Made ApplicationDescription and DriverDescription case classes DriverDescription refactored to case class because it included no mutable fields. ApplicationDescription had one mutable field, which was appUiUrl. This field was set by the driver to point to the driver web UI. Master was modifying this field when the application was removed to redirect requests to history server. This was wrong because objects which are sent over the wire should be immutable. Now appUiUrl is immutable in ApplicationDescription and always points to the driver UI even if it is already shutdown. The UI url which master exposes to the user and modifies dynamically is now included into ApplicationInfo - a data object which describes the application state internally in master. That URL in ApplicationInfo is initialised with the value from ApplicationDescription. ApplicationDescription also included value user, which is now a part of case class fields. Author: Jacek Lewandowski Closes #9299 from jacek-lewandowski/SPARK-11344. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/233e534a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/233e534a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/233e534a Branch: refs/heads/master Commit: 233e534ac43ea25ac1b0e6a985f6928d46c5d03a Parents: b86f2ca Author: Jacek Lewandowski Authored: Tue Nov 3 12:46:11 2015 + Committer: Sean Owen Committed: Tue Nov 3 12:46:11 2015 + -- .../spark/deploy/ApplicationDescription.scala | 33 ++-- .../apache/spark/deploy/DriverDescription.scala | 21 - .../spark/deploy/master/ApplicationInfo.scala | 7 + .../org/apache/spark/deploy/master/Master.scala | 12 --- .../deploy/master/ui/ApplicationPage.scala | 2 +- .../spark/deploy/master/ui/MasterPage.scala | 2 +- .../apache/spark/deploy/DeployTestUtils.scala | 3 +- 7 files changed, 34 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/233e534a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index ae99432..78bbd5c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -19,30 +19,17 @@ package org.apache.spark.deploy import java.net.URI -private[spark] class ApplicationDescription( -val name: String, -val maxCores: Option[Int], -val memoryPerExecutorMB: Int, -val command: Command, -var appUiUrl: String, -val eventLogDir: Option[URI] = None, +private[spark] case class ApplicationDescription( +name: String, +maxCores: Option[Int], +memoryPerExecutorMB: Int, +command: Command, +appUiUrl: String, +eventLogDir: Option[URI] = None, // short name of compression codec used when writing event logs, if any (e.g. lzf) -val eventLogCodec: Option[String] = None, -val coresPerExecutor: Option[Int] = None) - extends Serializable { - - val user = System.getProperty("user.name", "") - - def copy( - name: String = name, - maxCores: Option[Int] = maxCores, - memoryPerExecutorMB: Int = memoryPerExecutorMB, - command: Command = command, - appUiUrl: String = appUiUrl, - eventLogDir: Option[URI] = eventLogDir, - eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription = -new ApplicationDescription( - name, maxCores, memoryPerExecutorMB, command, appUiUrl, eventLogDir, eventLogCodec) +eventLogCodec: Option[String] = None, +coresPerExecutor: Option[Int] = None, +user: String = System.getProperty("user.name", "")) { override def toString: String = "ApplicationDescription(" + name + ")" } http://git-wip-us.apache.org/repos/asf/spark/blob/233e534a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala index 659fb43..1f5626a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -17,21 +17,12 @@ package org.apache.spark.deploy -private[deploy] class DriverDescription( -val jarUrl: String, -val mem: Int, -val cores: Int, -val supervise: Boolean, -val c
spark git commit: [SPARK-10533][SQL] handle scientific notation in sqlParser
Repository: spark Updated Branches: refs/heads/master 233e534ac -> d188a6776 [SPARK-10533][SQL] handle scientific notation in sqlParser https://issues.apache.org/jira/browse/SPARK-10533 val df = sqlContext.createDataFrame(Seq(("a",1.0),("b",2.0),("c",3.0))) df.filter("_2 < 2.0e1").show Scientific notation didn't work. Author: Daoyuan Wang Closes #9085 from adrian-wang/scinotation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d188a677 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d188a677 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d188a677 Branch: refs/heads/master Commit: d188a67762dfc09929e30931509be5851e29dfa5 Parents: 233e534 Author: Daoyuan Wang Authored: Tue Nov 3 22:30:23 2015 +0800 Committer: Cheng Lian Committed: Tue Nov 3 22:30:23 2015 +0800 -- .../spark/sql/catalyst/AbstractSparkSQLParser.scala | 15 +-- .../org/apache/spark/sql/catalyst/SqlParser.scala| 11 +++ .../scala/org/apache/spark/sql/DataFrameSuite.scala | 11 --- 3 files changed, 32 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d188a677/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala index 2bac08e..04ac4f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala @@ -82,6 +82,10 @@ class SqlLexical extends StdLexical { override def toString: String = chars } + case class DecimalLit(chars: String) extends Token { +override def toString: String = chars + } + /* This is a work around to support the lazy setting */ def initialize(keywords: Seq[String]): Unit = { reserved.clear() @@ -102,8 +106,12 @@ class SqlLexical extends StdLexical { } override lazy val token: Parser[Token] = -( identChar ~ (identChar | digit).* ^^ - { case first ~ rest => processIdent((first :: rest).mkString) } +( rep1(digit) ~ ('.' ~> digit.*).? ~ (exp ~> sign.? ~ rep1(digit)) ^^ { +case i ~ None ~ (sig ~ rest) => + DecimalLit(i.mkString + "e" + sig.mkString + rest.mkString) +case i ~ Some(d) ~ (sig ~ rest) => + DecimalLit(i.mkString + "." + d.mkString + "e" + sig.mkString + rest.mkString) + } | digit.* ~ identChar ~ (identChar | digit).* ^^ { case first ~ middle ~ rest => processIdent((first ++ (middle :: rest)).mkString) } | rep1(digit) ~ ('.' ~> digit.*).? ^^ { @@ -125,6 +133,9 @@ class SqlLexical extends StdLexical { override def identChar: Parser[Elem] = letter | elem('_') + private lazy val sign: Parser[Elem] = elem("s", c => c == '+' || c == '-') + private lazy val exp: Parser[Elem] = elem("e", c => c == 'E' || c == 'e') + override def whitespace: Parser[Any] = ( whitespaceChar | '/' ~ '*' ~ comment http://git-wip-us.apache.org/repos/asf/spark/blob/d188a677/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index d7567e8..1ba559d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -337,6 +337,9 @@ object SqlParser extends AbstractSparkSQLParser with DataTypeParser { | sign.? ~ unsignedFloat ^^ { case s ~ f => Literal(toDecimalOrDouble(s.getOrElse("") + f)) } +| sign.? ~ unsignedDecimal ^^ { + case s ~ d => Literal(toDecimalOrDouble(s.getOrElse("") + d)) +} ) protected lazy val unsignedFloat: Parser[String] = @@ -344,6 +347,14 @@ object SqlParser extends AbstractSparkSQLParser with DataTypeParser { | elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars) ) + protected lazy val unsignedDecimal: Parser[String] = +( "." ~> decimalLit ^^ { u => "0." + u } +| elem("scientific_notation", _.isInstanceOf[lexical.DecimalLit]) ^^ (_.chars) +) + + def decimalLit: Parser[String] = +elem("scientific_notation", _.isInstanceOf[lexical.DecimalLit]) ^^ (_.chars) + protected lazy val sign: Parser[String] = ("+" | "-") protected lazy val integral: Parser[String] = http://git-wip-us.apache.org/repos/asf/spark/blob/d188a677/sql/core/src/test/scala/or
spark git commit: [SPARK-11256] Mark all Stage/ResultStage/ShuffleMapStage internal state as private.
Repository: spark Updated Branches: refs/heads/master d188a6776 -> 57446eb69 [SPARK-11256] Mark all Stage/ResultStage/ShuffleMapStage internal state as private. Author: Reynold Xin Closes #9219 from rxin/stage-cleanup1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/57446eb6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/57446eb6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/57446eb6 Branch: refs/heads/master Commit: 57446eb69ceb6b8856ab22b54abb22b47b80f841 Parents: d188a67 Author: Reynold Xin Authored: Tue Nov 3 07:06:00 2015 -0800 Committer: Reynold Xin Committed: Tue Nov 3 07:06:00 2015 -0800 -- .../apache/spark/scheduler/DAGScheduler.scala | 33 +-- .../apache/spark/scheduler/ResultStage.scala| 19 +- .../spark/scheduler/ShuffleMapStage.scala | 61 ++-- .../org/apache/spark/scheduler/Stage.scala | 5 +- 4 files changed, 80 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/57446eb6/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 995862e..5673fbf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import scala.collection.Map -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Stack} +import scala.collection.mutable.{HashMap, HashSet, Stack} import scala.concurrent.duration._ import scala.language.existentials import scala.language.postfixOps @@ -535,10 +535,8 @@ class DAGScheduler( jobIdToActiveJob -= job.jobId activeJobs -= job job.finalStage match { - case r: ResultStage => -r.resultOfJob = None - case m: ShuffleMapStage => -m.mapStageJobs = m.mapStageJobs.filter(_ != job) + case r: ResultStage => r.removeActiveJob() + case m: ShuffleMapStage => m.removeActiveJob(job) } } @@ -848,7 +846,7 @@ class DAGScheduler( val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job -finalStage.resultOfJob = Some(job) +finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( @@ -880,7 +878,7 @@ class DAGScheduler( val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got map stage job %s (%s) with %d output partitions".format( - jobId, callSite.shortForm, dependency.rdd.partitions.size)) + jobId, callSite.shortForm, dependency.rdd.partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) @@ -888,7 +886,7 @@ class DAGScheduler( val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job -finalStage.mapStageJobs = job :: finalStage.mapStageJobs +finalStage.addActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( @@ -950,12 +948,12 @@ class DAGScheduler( // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. outputCommitCoordinator.stageStart(stage.id) -val taskIdToLocations = try { +val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => - val job = s.resultOfJob.get + val job = s.activeJob.get partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) @@ -1016,7 +1014,7 @@ class DAGScheduler( } case stage: ResultStage => - val job = stage.resultOfJob.get + val job = stage.activeJob.get partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) @@ -1132,7 +1130,7 @@ class DAGScheduler( // Cast to ResultStage here because it's part of the ResultTask // TODO Refactor this out to a function that accepts a
spark git commit: [SPARK-10304] [SQL] Partition discovery should throw an exception if the dir structure is invalid
Repository: spark Updated Branches: refs/heads/master 57446eb69 -> d6035d97c [SPARK-10304] [SQL] Partition discovery should throw an exception if the dir structure is invalid JIRA: https://issues.apache.org/jira/browse/SPARK-10304 This patch detects if the structure of partition directories is not valid. The test cases are from #8547. Thanks zhzhan. cc liancheng Author: Liang-Chi Hsieh Closes #8840 from viirya/detect_invalid_part_dir. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6035d97 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6035d97 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6035d97 Branch: refs/heads/master Commit: d6035d97c91fe78b1336ade48134252915263ea6 Parents: 57446eb Author: Liang-Chi Hsieh Authored: Tue Nov 3 07:41:50 2015 -0800 Committer: Davies Liu Committed: Tue Nov 3 07:41:50 2015 -0800 -- .../datasources/PartitioningUtils.scala | 36 ++-- .../ParquetPartitionDiscoverySuite.scala| 36 ++-- 2 files changed, 59 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d6035d97/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 0a2007e..628c5e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -77,9 +77,11 @@ private[sql] object PartitioningUtils { defaultPartitionName: String, typeInference: Boolean): PartitionSpec = { // First, we need to parse every partition's path and see if we can find partition values. -val pathsWithPartitionValues = paths.flatMap { path => - parsePartition(path, defaultPartitionName, typeInference).map(path -> _) -} +val (partitionValues, optBasePaths) = paths.map { path => + parsePartition(path, defaultPartitionName, typeInference) +}.unzip + +val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _)) if (pathsWithPartitionValues.isEmpty) { // This dataset is not partitioned. @@ -87,6 +89,12 @@ private[sql] object PartitioningUtils { } else { // This dataset is partitioned. We need to check whether all partitions have the same // partition columns and resolve potential type conflicts. + val basePaths = optBasePaths.flatMap(x => x) + assert( +basePaths.distinct.size == 1, +"Conflicting directory structures detected. Suspicious paths:\b" + + basePaths.mkString("\n\t", "\n\t", "\n\n")) + val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues) // Creates the StructType which represents the partition columns. @@ -110,12 +118,12 @@ private[sql] object PartitioningUtils { } /** - * Parses a single partition, returns column names and values of each partition column. For - * example, given: + * Parses a single partition, returns column names and values of each partition column, also + * the base path. For example, given: * {{{ * path = hdfs://:/path/to/partition/a=42/b=hello/c=3.14 * }}} - * it returns: + * it returns the partition: * {{{ * PartitionValues( * Seq("a", "b", "c"), @@ -124,34 +132,40 @@ private[sql] object PartitioningUtils { * Literal.create("hello", StringType), * Literal.create(3.14, FloatType))) * }}} + * and the base path: + * {{{ + * /path/to/partition + * }}} */ private[sql] def parsePartition( path: Path, defaultPartitionName: String, - typeInference: Boolean): Option[PartitionValues] = { + typeInference: Boolean): (Option[PartitionValues], Option[Path]) = { val columns = ArrayBuffer.empty[(String, Literal)] // Old Hadoop versions don't have `Path.isRoot` var finished = path.getParent == null var chopped = path +var basePath = path while (!finished) { // Sometimes (e.g., when speculative task is enabled), temporary directories may be left // uncleaned. Here we simply ignore them. if (chopped.getName.toLowerCase == "_temporary") { -return None +return (None, None) } val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference) maybeColumn.foreach(columns += _) + basePath = chopped chopped = chopped.getParent - finished = may
spark git commit: [SPARK-9836][ML] Provide R-like summary statistics for OLS via normal equation solver
Repository: spark Updated Branches: refs/heads/master d6035d97c -> d6f10aa7e [SPARK-9836][ML] Provide R-like summary statistics for OLS via normal equation solver https://issues.apache.org/jira/browse/SPARK-9836 Author: Yanbo Liang Closes #9413 from yanboliang/spark-9836. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6f10aa7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6f10aa7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6f10aa7 Branch: refs/heads/master Commit: d6f10aa7ea2806c0fbcfc31d7dee91d28319fab7 Parents: d6035d9 Author: Yanbo Liang Authored: Tue Nov 3 08:29:07 2015 -0800 Committer: Xiangrui Meng Committed: Tue Nov 3 08:29:07 2015 -0800 -- .../spark/ml/optim/WeightedLeastSquares.scala | 15 ++- .../spark/ml/regression/LinearRegression.scala | 90 - .../mllib/linalg/CholeskyDecomposition.scala| 16 +++ .../ml/regression/LinearRegressionSuite.scala | 129 +++ 4 files changed, 243 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d6f10aa7/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index 3d64f7f..e612a21 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -26,10 +26,12 @@ import org.apache.spark.rdd.RDD * Model fitted by [[WeightedLeastSquares]]. * @param coefficients model coefficients * @param intercept model intercept + * @param diagInvAtWA diagonal of matrix (A^T * W * A)^-1 */ private[ml] class WeightedLeastSquaresModel( val coefficients: DenseVector, -val intercept: Double) extends Serializable +val intercept: Double, +val diagInvAtWA: DenseVector) extends Serializable /** * Weighted least squares solver via normal equation. @@ -73,7 +75,9 @@ private[ml] class WeightedLeastSquares( val summary = instances.treeAggregate(new Aggregator)(_.add(_), _.merge(_)) summary.validate() logInfo(s"Number of instances: ${summary.count}.") +val k = summary.k val triK = summary.triK +val wSum = summary.wSum val bBar = summary.bBar val bStd = summary.bStd val aBar = summary.aBar @@ -109,6 +113,11 @@ private[ml] class WeightedLeastSquares( val x = new DenseVector(CholeskyDecomposition.solve(aaBar.values, abBar.values)) +val aaInv = CholeskyDecomposition.inverse(aaBar.values, k) +// aaInv is a packed upper triangular matrix, here we get all elements on diagonal +val diagInvAtWA = new DenseVector((1 to k).map { i => + aaInv(i + (i - 1) * i / 2 - 1) / wSum }.toArray) + // compute intercept val intercept = if (fitIntercept) { bBar - BLAS.dot(aBar, x) @@ -116,7 +125,7 @@ private[ml] class WeightedLeastSquares( 0.0 } -new WeightedLeastSquaresModel(x, intercept) +new WeightedLeastSquaresModel(x, intercept, diagInvAtWA) } } @@ -131,7 +140,7 @@ private[ml] object WeightedLeastSquares { var k: Int = _ var count: Long = _ var triK: Int = _ -private var wSum: Double = _ +var wSum: Double = _ private var wwSum: Double = _ private var bSum: Double = _ private var bbSum: Double = _ http://git-wip-us.apache.org/repos/asf/spark/blob/d6f10aa7/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 6e9c744..c51e304 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -21,6 +21,7 @@ import scala.collection.mutable import breeze.linalg.{DenseVector => BDV} import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN} +import breeze.stats.distributions.StudentsT import org.apache.spark.{Logging, SparkException} import org.apache.spark.annotation.Experimental @@ -36,7 +37,7 @@ import org.apache.spark.mllib.linalg.BLAS._ import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.functions.{col, udf, lit} +import org.apache.spark.sql.functions._ import org.apache.spark.storage.StorageLevel /** @@ -173,8 +174,11 @@ class Lin
spark git commit: [MINOR][ML] Fix naming conventions of AFTSurvivalRegression coefficients
Repository: spark Updated Branches: refs/heads/master d6f10aa7e -> 3434572b1 [MINOR][ML] Fix naming conventions of AFTSurvivalRegression coefficients Rename ```regressionCoefficients``` back to ```coefficients```, and name ```weights``` to ```parameters```. See discussion [here](https://github.com/apache/spark/pull/9311/files#diff-e277fd0bc21f825d3196b4551c01fe5fR230). mengxr vectorijk dbtsai Author: Yanbo Liang Closes #9431 from yanboliang/aft-coefficients. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3434572b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3434572b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3434572b Branch: refs/heads/master Commit: 3434572b141075f00698d94e6ee80febd3093c3b Parents: d6f10aa Author: Yanbo Liang Authored: Tue Nov 3 08:31:16 2015 -0800 Committer: Xiangrui Meng Committed: Tue Nov 3 08:31:16 2015 -0800 -- .../ml/regression/AFTSurvivalRegression.scala | 38 ++-- .../regression/AFTSurvivalRegressionSuite.scala | 12 +++ 2 files changed, 25 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3434572b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 4dbbc7d..b7d0958 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -200,17 +200,17 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S val numFeatures = dataset.select($(featuresCol)).take(1)(0).getAs[Vector](0).size /* - The coefficients vector has three parts: + The parameters vector has three parts: the first element: Double, log(sigma), the log of scale parameter the second element: Double, intercept of the beta parameter the third to the end elements: Doubles, regression coefficients vector of the beta parameter */ -val initialCoefficients = Vectors.zeros(numFeatures + 2) +val initialParameters = Vectors.zeros(numFeatures + 2) val states = optimizer.iterations(new CachedDiffFunction(costFun), - initialCoefficients.toBreeze.toDenseVector) + initialParameters.toBreeze.toDenseVector) -val coefficients = { +val parameters = { val arrayBuilder = mutable.ArrayBuilder.make[Double] var state: optimizer.State = null while (states.hasNext) { @@ -227,10 +227,10 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S if (handlePersistence) instances.unpersist() -val regressionCoefficients = Vectors.dense(coefficients.slice(2, coefficients.length)) -val intercept = coefficients(1) -val scale = math.exp(coefficients(0)) -val model = new AFTSurvivalRegressionModel(uid, regressionCoefficients, intercept, scale) +val coefficients = Vectors.dense(parameters.slice(2, parameters.length)) +val intercept = parameters(1) +val scale = math.exp(parameters(0)) +val model = new AFTSurvivalRegressionModel(uid, coefficients, intercept, scale) copyValues(model.setParent(this)) } @@ -251,7 +251,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S @Since("1.6.0") class AFTSurvivalRegressionModel private[ml] ( @Since("1.6.0") override val uid: String, -@Since("1.6.0") val regressionCoefficients: Vector, +@Since("1.6.0") val coefficients: Vector, @Since("1.6.0") val intercept: Double, @Since("1.6.0") val scale: Double) extends Model[AFTSurvivalRegressionModel] with AFTSurvivalRegressionParams { @@ -275,7 +275,7 @@ class AFTSurvivalRegressionModel private[ml] ( @Since("1.6.0") def predictQuantiles(features: Vector): Vector = { // scale parameter for the Weibull distribution of lifetime -val lambda = math.exp(BLAS.dot(regressionCoefficients, features) + intercept) +val lambda = math.exp(BLAS.dot(coefficients, features) + intercept) // shape parameter for the Weibull distribution of lifetime val k = 1 / scale val quantiles = $(quantileProbabilities).map { @@ -286,7 +286,7 @@ class AFTSurvivalRegressionModel private[ml] ( @Since("1.6.0") def predict(features: Vector): Double = { -math.exp(BLAS.dot(regressionCoefficients, features) + intercept) +math.exp(BLAS.dot(coefficients, features) + intercept) } @Since("1.6.0") @@ -309,7 +309,7 @@ class AFTSurvivalRegressionModel private[ml] ( @Since("1.6.0") override
spark git commit: [SPARK-11349][ML] Support transform string label for RFormula
Repository: spark Updated Branches: refs/heads/master 3434572b1 -> f54ff19b1 [SPARK-11349][ML] Support transform string label for RFormula Currently ```RFormula``` can only handle label with ```NumericType``` or ```BinaryType``` (cast it to ```DoubleType``` as the label of Linear Regression training), we should also support label of ```StringType``` which is needed for Logistic Regression (glm with family = "binomial"). For label of ```StringType```, we should use ```StringIndexer``` to transform it to 0-based index. Author: Yanbo Liang Closes #9302 from yanboliang/spark-11349. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f54ff19b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f54ff19b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f54ff19b Branch: refs/heads/master Commit: f54ff19b1edd4903950cb334987a447445fa97ef Parents: 3434572 Author: Yanbo Liang Authored: Tue Nov 3 08:32:37 2015 -0800 Committer: Xiangrui Meng Committed: Tue Nov 3 08:32:37 2015 -0800 -- .../org/apache/spark/ml/feature/RFormula.scala | 10 +- .../apache/spark/ml/feature/RFormulaSuite.scala | 19 +++ 2 files changed, 28 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f54ff19b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index f9b8400..5c43a41 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -132,6 +132,14 @@ class RFormula(override val uid: String) extends Estimator[RFormulaModel] with R .setOutputCol($(featuresCol)) encoderStages += new VectorAttributeRewriter($(featuresCol), prefixesToRewrite.toMap) encoderStages += new ColumnPruner(tempColumns.toSet) + +if (dataset.schema.fieldNames.contains(resolvedFormula.label) && + dataset.schema(resolvedFormula.label).dataType == StringType) { + encoderStages += new StringIndexer() +.setInputCol(resolvedFormula.label) +.setOutputCol($(labelCol)) +} + val pipelineModel = new Pipeline(uid).setStages(encoderStages.toArray).fit(dataset) copyValues(new RFormulaModel(uid, resolvedFormula, pipelineModel).setParent(this)) } @@ -172,7 +180,7 @@ class RFormulaModel private[feature]( override def transformSchema(schema: StructType): StructType = { checkCanTransform(schema) val withFeatures = pipelineModel.transformSchema(schema) -if (hasLabelCol(schema)) { +if (hasLabelCol(withFeatures)) { withFeatures } else if (schema.exists(_.name == resolvedFormula.label)) { val nullable = schema(resolvedFormula.label).dataType match { http://git-wip-us.apache.org/repos/asf/spark/blob/f54ff19b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala index b560130..dc20a5e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala @@ -107,6 +107,25 @@ class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext { assert(result.collect() === expected.collect()) } + test("index string label") { +val formula = new RFormula().setFormula("id ~ a + b") +val original = sqlContext.createDataFrame( + Seq(("male", "foo", 4), ("female", "bar", 4), ("female", "bar", 5), ("male", "baz", 5)) +).toDF("id", "a", "b") +val model = formula.fit(original) +val result = model.transform(original) +val resultSchema = model.transformSchema(original.schema) +val expected = sqlContext.createDataFrame( + Seq( +("male", "foo", 4, Vectors.dense(0.0, 1.0, 4.0), 1.0), +("female", "bar", 4, Vectors.dense(1.0, 0.0, 4.0), 0.0), +("female", "bar", 5, Vectors.dense(1.0, 0.0, 5.0), 0.0), +("male", "baz", 5, Vectors.dense(0.0, 0.0, 5.0), 1.0)) +).toDF("id", "a", "b", "features", "label") +// assert(result.schema.toString == resultSchema.toString) +assert(result.collect() === expected.collect()) + } + test("attribute generation") { val formula = new RFormula().setFormula("id ~ a + b") val original = sqlContext.createDataFrame( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional com
spark git commit: Update branch-1.5 for 1.5.2 release.
Repository: spark Updated Branches: refs/heads/branch-1.5 5604ce9c1 -> 979566690 Update branch-1.5 for 1.5.2 release. Author: Reynold Xin Closes #9435 from rxin/patch1.5.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97956669 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97956669 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97956669 Branch: refs/heads/branch-1.5 Commit: 97956669053646f00131073358e53b05d0c3d5d0 Parents: 5604ce9 Author: Reynold Xin Authored: Tue Nov 3 08:50:08 2015 -0800 Committer: Reynold Xin Committed: Tue Nov 3 08:50:08 2015 -0800 -- CHANGES.txt | 397 +++ R/pkg/DESCRIPTION | 2 +- .../main/scala/org/apache/spark/package.scala | 2 +- dev/create-release/generate-changelist.py | 4 +- docs/_config.yml| 4 +- ec2/spark_ec2.py| 6 +- 6 files changed, 407 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/97956669/CHANGES.txt -- diff --git a/CHANGES.txt b/CHANGES.txt index 449afa6..13c6053 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,403 @@ Spark Change Log +Release 1.5.2 + + [SPARK-11188] [SQL] Elide stacktraces in bin/spark-sql for AnalysisExceptions + Dilip Biswal + 2015-11-03 12:14:01 +0100 + Commit: 5604ce9, github.com/apache/spark/pull/9374 + + [SPARK-11424] Guard against double-close() of RecordReaders + Josh Rosen + 2015-10-31 10:47:22 -0700 + Commit: b85bf8f, github.com/apache/spark/pull/9382 + + [SPARK-11434][SPARK-11103][SQL] Fix test ": Filter applied on merged Parquet schema with new column fails" + Yin Huai + 2015-10-30 20:05:07 -0700 + Commit: c9ac0e9, github.com/apache/spark/pull/9387 + + [SPARK-10829] [SPARK-11301] [SQL] fix 2 bugs for filter on partitioned columns (1.5 backport) + Wenchen Fan + 2015-10-30 12:14:53 -0700 + Commit: 6b10ea5, github.com/apache/spark/pull/9371 + + [SPARK-11103][SQL] Filter applied on Merged Parquet shema with new column fail + hyukjinkwon + 2015-10-30 18:17:35 +0800 + Commit: 06d3257, github.com/apache/spark/pull/9327 + + [SPARK-11417] [SQL] no @Override in codegen + Davies Liu + 2015-10-30 00:36:20 -0700 + Commit: 0df2c78, github.com/apache/spark/pull/9372 + + [SPARK-11032] [SQL] correctly handle having + Wenchen Fan + 2015-10-13 17:11:22 -0700 + Commit: bb3b362, github.com/apache/spark/pull/9105 + + [SPARK-11246] [SQL] Table cache for Parquet broken in 1.5 + xin Wu + 2015-10-29 07:42:46 -0700 + Commit: 76d7423, github.com/apache/spark/pull/9326 + + Typo in mllib-evaluation-metrics.md + Mageswaran.D + 2015-10-28 08:46:30 -0700 + Commit: 9e3197a, github.com/apache/spark/pull/9333 + + [SPARK-11303][SQL] filter should not be pushed down into sample + Yanbo Liang + 2015-10-27 11:28:59 +0100 + Commit: 3bd596d, github.com/apache/spark/pull/9294 + + [SPARK-11302][MLLIB] 2) Multivariate Gaussian Model with Covariance matrix returns incorrect answer in some cases + Sean Owen + 2015-10-27 23:07:37 -0700 + Commit: 86ee81e, github.com/apache/spark/pull/9309 + + [SPARK-11270][STREAMING] Add improved equality testing for TopicAndPartition from the Kafka Streaming API + Nick Evans + 2015-10-27 01:29:06 -0700 + Commit: abb0ca7, github.com/apache/spark/pull/9236 + + [SQL][DOC] Minor document fixes in interfaces.scala + Alexander Slesarenko + 2015-10-26 23:49:14 +0100 + Commit: 8a6e63c, github.com/apache/spark/pull/9284 + + [SPARK-5966][WIP] Spark-submit deploy-mode cluster is not compatible with master local> + Kevin Yu + 2015-10-26 09:34:15 + + Commit: a355d0d, github.com/apache/spark/pull/9220 + + [SPARK-11287] Fixed class name to properly start TestExecutor from deploy.client.TestClient + Bryan Cutler + 2015-10-25 19:05:45 + + Commit: 74921c2, github.com/apache/spark/pull/9255 + + [SPARK-11299][DOC] Fix link to Scala DataFrame Functions reference + Josh Rosen + 2015-10-25 10:31:44 +0100 + Commit: 36fddb0, github.com/apache/spark/pull/9269 + + Fix typos + Jacek Laskowski + 2015-10-25 01:33:22 +0100 + Commit: 5200a6e, github.com/apache/spark/pull/9250 + + [SPARK-11264] bin/spark-class can't find assembly jars with certain GREP_OPTIONS set + Jeffrey Naisbitt + 2015-10-24 18:21:36 +0100 + Commit: 1cd2d9c, github.com/apache/spark/pull/9231 + + [SPARK-11294][SPARKR] Improve R doc for read.df, write.df, saveAsTable + felixcheung + 2015-10-23 21:42:00 -0700 + Commit: 56f0bb6, github.com/apache/spark/pull/9261 + + [SPARK-10971][SPARKR] RRunner should allow setting path to Rscript. + Sun Rui + 2015-10-23 21:38:04 -0700 + Co
spark git commit: [SPARK-9790][YARN] Expose in WebUI if NodeManager is the reason why executors were killed.
Repository: spark Updated Branches: refs/heads/master f54ff19b1 -> b2e4b314d [SPARK-9790][YARN] Expose in WebUI if NodeManager is the reason why executors were killed. Author: Mark Grover Closes #8093 from markgrover/nm2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2e4b314 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2e4b314 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2e4b314 Branch: refs/heads/master Commit: b2e4b314d989de8cad012bbddba703b31d8378a4 Parents: f54ff19 Author: Mark Grover Authored: Tue Nov 3 08:51:40 2015 -0800 Committer: Marcelo Vanzin Committed: Tue Nov 3 08:51:40 2015 -0800 -- core/src/main/scala/org/apache/spark/TaskEndReason.scala | 8 ++-- .../main/scala/org/apache/spark/rpc/RpcEndpointRef.scala | 4 ++-- .../org/apache/spark/scheduler/TaskSetManager.scala | 4 ++-- .../cluster/CoarseGrainedSchedulerBackend.scala | 5 +++-- .../spark/scheduler/cluster/YarnSchedulerBackend.scala | 1 + .../main/scala/org/apache/spark/util/JsonProtocol.scala | 11 --- .../apache/spark/ui/jobs/JobProgressListenerSuite.scala | 2 +- .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 11 ++- 8 files changed, 29 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/main/scala/org/apache/spark/TaskEndReason.scala -- diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 18278b2..13241b7 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -223,8 +223,10 @@ case class TaskCommitDenied( * the task crashed the JVM. */ @DeveloperApi -case class ExecutorLostFailure(execId: String, exitCausedByApp: Boolean = true) - extends TaskFailedReason { +case class ExecutorLostFailure( +execId: String, +exitCausedByApp: Boolean = true, +reason: Option[String]) extends TaskFailedReason { override def toErrorString: String = { val exitBehavior = if (exitCausedByApp) { "caused by one of the running tasks" @@ -232,6 +234,8 @@ case class ExecutorLostFailure(execId: String, exitCausedByApp: Boolean = true) "unrelated to the running tasks" } s"ExecutorLostFailure (executor ${execId} exited due to an issue ${exitBehavior})" +s"ExecutorLostFailure (executor ${execId} exited ${exitBehavior})" + + reason.map { r => s" Reason: $r" }.getOrElse("") } override def countTowardsTaskFailures: Boolean = exitCausedByApp http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala index f25710b..623da3e 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala @@ -67,7 +67,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) * The default `timeout` will be used in every trial of calling `sendWithReply`. Because this * method retries, the message handling in the receiver side should be idempotent. * - * Note: this is a blocking action which may cost a lot of time, so don't call it in an message + * Note: this is a blocking action which may cost a lot of time, so don't call it in a message * loop of [[RpcEndpoint]]. * * @param message the message to send @@ -82,7 +82,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) * retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method * retries, the message handling in the receiver side should be idempotent. * - * Note: this is a blocking action which may cost a lot of time, so don't call it in an message + * Note: this is a blocking action which may cost a lot of time, so don't call it in a message * loop of [[RpcEndpoint]]. * * @param message the message to send http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 9b3fad9..114468c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@
[2/2] spark git commit: Preparing development version 1.5.3-SNAPSHOT
Preparing development version 1.5.3-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/072afc6f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/072afc6f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/072afc6f Branch: refs/heads/branch-1.5 Commit: 072afc6f41269ce9948b38a4ade4e3bfa4695cb0 Parents: 49c30c1 Author: Patrick Wendell Authored: Tue Nov 3 08:52:03 2015 -0800 Committer: Patrick Wendell Committed: Tue Nov 3 08:52:03 2015 -0800 -- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 33 files changed, 33 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/072afc6f/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index f0c6c0c..6114f8c 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2 +1.5.3-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/072afc6f/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index fdbbf9d..dd9eb9e 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2 +1.5.3-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/072afc6f/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index bdf355f..350aaab 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2 +1.5.3-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/072afc6f/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 6b7f72c..0d87f37 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2 +1.5.3-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/072afc6f/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index 4d43903..3982b3d 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2 +1.5.3-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/072afc6f/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index e3fa0c0..033f222 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2 +1.5.3-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/072afc6f/external/flume/pom.xml -- diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 700e912..74e9cf4 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spa
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.5.2-rc2 [created] 49c30c1f6 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: Preparing Spark release v1.5.2-rc2
Repository: spark Updated Branches: refs/heads/branch-1.5 979566690 -> 072afc6f4 Preparing Spark release v1.5.2-rc2 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49c30c1f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49c30c1f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49c30c1f Branch: refs/heads/branch-1.5 Commit: 49c30c1f65e5f1318dcfa4d1e3a5956d9ce22777 Parents: 9795666 Author: Patrick Wendell Authored: Tue Nov 3 08:51:57 2015 -0800 Committer: Patrick Wendell Committed: Tue Nov 3 08:51:57 2015 -0800 -- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 33 files changed, 33 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/49c30c1f/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 6114f8c..f0c6c0c 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.3-SNAPSHOT +1.5.2 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/49c30c1f/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index dd9eb9e..fdbbf9d 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.3-SNAPSHOT +1.5.2 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/49c30c1f/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 350aaab..bdf355f 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.3-SNAPSHOT +1.5.2 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/49c30c1f/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 0d87f37..6b7f72c 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.3-SNAPSHOT +1.5.2 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/49c30c1f/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index 3982b3d..4d43903 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.3-SNAPSHOT +1.5.2 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/49c30c1f/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 033f222..e3fa0c0 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.3-SNAPSHOT +1.5.2 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/49c30c1f/external/flume/pom.xml -- diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 74e9cf4..700e912 100644 --- a/external/flume/pom.xml +++ b/
spark git commit: [SPARK-10978][SQL] Allow data sources to eliminate filters
Repository: spark Updated Branches: refs/heads/master b2e4b314d -> ebf8b0b48 [SPARK-10978][SQL] Allow data sources to eliminate filters This PR adds a new method `unhandledFilters` to `BaseRelation`. Data sources which implement this method properly may avoid the overhead of defensive filtering done by Spark SQL. Author: Cheng Lian Closes #9399 from liancheng/spark-10978.unhandled-filters. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ebf8b0b4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ebf8b0b4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ebf8b0b4 Branch: refs/heads/master Commit: ebf8b0b48deaad64f7ca27051caee763451e2623 Parents: b2e4b31 Author: Cheng Lian Authored: Tue Nov 3 10:07:45 2015 -0800 Committer: Yin Huai Committed: Tue Nov 3 10:07:45 2015 -0800 -- .../datasources/DataSourceStrategy.scala| 131 +++ .../apache/spark/sql/sources/interfaces.scala | 9 ++ .../parquet/ParquetFilterSuite.scala| 2 +- .../spark/sql/sources/FilteredScanSuite.scala | 129 +- .../SimpleTextHadoopFsRelationSuite.scala | 47 ++- .../spark/sql/sources/SimpleTextRelation.scala | 65 - 6 files changed, 315 insertions(+), 68 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ebf8b0b4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 6585986..7265d6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -43,7 +43,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { l, projects, filters, -(a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil +(requestedColumns, allPredicates, _) => + toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _)) => pruneFilterProject( @@ -266,47 +267,81 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { relation, projects, filterPredicates, - (requestedColumns, pushedFilters) => { -scanBuilder(requestedColumns, selectFilters(pushedFilters).toArray) + (requestedColumns, _, pushedFilters) => { +scanBuilder(requestedColumns, pushedFilters.toArray) }) } - // Based on Catalyst expressions. + // Based on Catalyst expressions. The `scanBuilder` function accepts three arguments: + // + // 1. A `Seq[Attribute]`, containing all required column attributes. Used to handle relation + // traits that support column pruning (e.g. `PrunedScan` and `PrunedFilteredScan`). + // + // 2. A `Seq[Expression]`, containing all gathered Catalyst filter expressions, only used for + // `CatalystScan`. + // + // 3. A `Seq[Filter]`, containing all data source `Filter`s that are converted from (possibly a + // subset of) Catalyst filter expressions and can be handled by `relation`. Used to handle + // relation traits (`CatalystScan` excluded) that support filter push-down (e.g. + // `PrunedFilteredScan` and `HadoopFsRelation`). + // + // Note that 2 and 3 shouldn't be used together. protected def pruneFilterProjectRaw( - relation: LogicalRelation, - projects: Seq[NamedExpression], - filterPredicates: Seq[Expression], - scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[InternalRow]) = { +relation: LogicalRelation, +projects: Seq[NamedExpression], +filterPredicates: Seq[Expression], +scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]) = { val projectSet = AttributeSet(projects.flatMap(_.references)) val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) -val filterCondition = filterPredicates.reduceLeftOption(expressions.And) -val pushedFilters = filterPredicates.map { _ transform { +val candidatePredicates = filterPredicates.map { _ transform { case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes. }} +val (unhandledPredicates, pushedFilters) = + selectFilters(relation.relation, candidatePredicates) + +// A set of column attributes that are only referenced by pushed down filters. We can eliminate +// them fro
spark git commit: [SPARK-11407][SPARKR] Add doc for running from RStudio
Repository: spark Updated Branches: refs/heads/master ebf8b0b48 -> a9676cc71 [SPARK-11407][SPARKR] Add doc for running from RStudio ![image](https://cloud.githubusercontent.com/assets/8969467/10871746/612ba44a-80a4-11e5-99a0-40b9931dee52.png) (This is without css, but you get the idea) shivaram Author: felixcheung Closes #9401 from felixcheung/rstudioprogrammingguide. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9676cc7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9676cc7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9676cc7 Branch: refs/heads/master Commit: a9676cc7107c5df6c62a58668c4d95ced1238370 Parents: ebf8b0b Author: felixcheung Authored: Tue Nov 3 11:53:10 2015 -0800 Committer: Shivaram Venkataraman Committed: Tue Nov 3 11:53:10 2015 -0800 -- docs/sparkr.md | 46 +++--- 1 file changed, 43 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a9676cc7/docs/sparkr.md -- diff --git a/docs/sparkr.md b/docs/sparkr.md index 497a276..437bd47 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -30,14 +30,22 @@ The entry point into SparkR is the `SparkContext` which connects your R program You can create a `SparkContext` using `sparkR.init` and pass in options such as the application name , any spark packages depended on, etc. Further, to work with DataFrames we will need a `SQLContext`, which can be created from the SparkContext. If you are working from the `sparkR` shell, the -`SQLContext` and `SparkContext` should already be created for you. +`SQLContext` and `SparkContext` should already be created for you, and you would not need to call +`sparkR.init`. + {% highlight r %} sc <- sparkR.init() sqlContext <- sparkRSQL.init(sc) {% endhighlight %} + + +## Starting Up from RStudio -In the event you are creating `SparkContext` instead of using `sparkR` shell or `spark-submit`, you +You can also start SparkR from RStudio. You can connect your R program to a Spark cluster from +RStudio, R shell, Rscript or other R IDEs. To start, make sure SPARK_HOME is set in environment +(you can check [Sys.getenv](https://stat.ethz.ch/R-manual/R-devel/library/base/html/Sys.getenv.html)), +load the SparkR package, and call `sparkR.init` as below. In addition to calling `sparkR.init`, you could also specify certain Spark driver properties. Normally these [Application properties](configuration.html#application-properties) and [Runtime Environment](configuration.html#runtime-environment) cannot be set programmatically, as the @@ -45,9 +53,41 @@ driver JVM process would have been started, in this case SparkR takes care of th them, pass them as you would other configuration properties in the `sparkEnvir` argument to `sparkR.init()`. + {% highlight r %} -sc <- sparkR.init("local[*]", "SparkR", "/home/spark", list(spark.driver.memory="2g")) +if (nchar(Sys.getenv("SPARK_HOME")) < 1) { + Sys.setenv(SPARK_HOME = "/home/spark") +} +library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"))) +sc <- sparkR.init(master = "local[*]", sparkEnvir = list(spark.driver.memory="2g")) {% endhighlight %} + + +The following options can be set in `sparkEnvir` with `sparkR.init` from RStudio: + + + Property NameProperty groupspark-submit equivalent + +spark.driver.memory +Application Properties +--driver-memory + + +spark.driver.extraClassPath +Runtime Environment +--driver-class-path + + +spark.driver.extraJavaOptions +Runtime Environment +--driver-java-options + + +spark.driver.extraLibraryPath +Runtime Environment +--driver-library-path + + - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11467][SQL] add Python API for stddev/variance
Repository: spark Updated Branches: refs/heads/master a9676cc71 -> 1d04dc95c [SPARK-11467][SQL] add Python API for stddev/variance Add Python API for stddev/stddev_pop/stddev_samp/variance/var_pop/var_samp/skewness/kurtosis Author: Davies Liu Closes #9424 from davies/py_var. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d04dc95 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d04dc95 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d04dc95 Branch: refs/heads/master Commit: 1d04dc95c0d3caa485936e65b0493bcc9719f27e Parents: a9676cc Author: Davies Liu Authored: Tue Nov 3 13:33:46 2015 -0800 Committer: Reynold Xin Committed: Tue Nov 3 13:33:46 2015 -0800 -- python/pyspark/sql/functions.py | 17 python/pyspark/sql/group.py | 88 .../scala/org/apache/spark/sql/functions.scala | 67 --- 3 files changed, 105 insertions(+), 67 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1d04dc95/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index fa04f4c..2f7c2f4 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -122,6 +122,21 @@ _functions_1_4 = { 'bitwiseNOT': 'Computes bitwise not.', } +_functions_1_6 = { +# unary math functions +"stddev": "Aggregate function: returns the unbiased sample standard deviation of" + + " the expression in a group.", +"stddev_samp": "Aggregate function: returns the unbiased sample standard deviation of" + + " the expression in a group.", +"stddev_pop": "Aggregate function: returns population standard deviation of" + + " the expression in a group.", +"variance": "Aggregate function: returns the population variance of the values in a group.", +"var_samp": "Aggregate function: returns the unbiased variance of the values in a group.", +"var_pop": "Aggregate function: returns the population variance of the values in a group.", +"skewness": "Aggregate function: returns the skewness of the values in a group.", +"kurtosis": "Aggregate function: returns the kurtosis of the values in a group." +} + # math functions that take two arguments as input _binary_mathfunctions = { 'atan2': 'Returns the angle theta from the conversion of rectangular coordinates (x, y) to' + @@ -172,6 +187,8 @@ for _name, _doc in _binary_mathfunctions.items(): globals()[_name] = since(1.4)(_create_binary_mathfunction(_name, _doc)) for _name, _doc in _window_functions.items(): globals()[_name] = since(1.4)(_create_window_function(_name, _doc)) +for _name, _doc in _functions_1_6.items(): +globals()[_name] = since(1.6)(_create_function(_name, _doc)) del _name, _doc http://git-wip-us.apache.org/repos/asf/spark/blob/1d04dc95/python/pyspark/sql/group.py -- diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py index 71c0bcc..946b53e 100644 --- a/python/pyspark/sql/group.py +++ b/python/pyspark/sql/group.py @@ -167,6 +167,94 @@ class GroupedData(object): [Row(sum(age)=7, sum(height)=165)] """ +@df_varargs_api +@since(1.6) +def stddev(self, *cols): +"""Compute the sample standard deviation for each numeric columns for each group. + +:param cols: list of column names (string). Non-numeric columns are ignored. + +>>> df3.groupBy().stddev('age', 'height').collect() +[Row(STDDEV(age)=2.12..., STDDEV(height)=3.53...)] +""" + +@df_varargs_api +@since(1.6) +def stddev_samp(self, *cols): +"""Compute the sample standard deviation for each numeric columns for each group. + +:param cols: list of column names (string). Non-numeric columns are ignored. + +>>> df3.groupBy().stddev_samp('age', 'height').collect() +[Row(STDDEV_SAMP(age)=2.12..., STDDEV_SAMP(height)=3.53...)] +""" + +@df_varargs_api +@since(1.6) +def stddev_pop(self, *cols): +"""Compute the population standard deviation for each numeric columns for each group. + +:param cols: list of column names (string). Non-numeric columns are ignored. + +>>> df3.groupBy().stddev_pop('age', 'height').collect() +[Row(STDDEV_POP(age)=1.5, STDDEV_POP(height)=2.5)] +""" + +@df_varargs_api +@since(1.6) +def variance(self, *cols): +"""Compute the sample variance for each numeric columns for each group. + +:param cols: list of column names (string). Non-numeric columns are ignored. + +>>> df3.grou
spark git commit: [SPARK-11424] Guard against double-close() of RecordReaders (branch-1.3 backport)
Repository: spark Updated Branches: refs/heads/branch-1.3 0ce148533 -> b90e5cba2 [SPARK-11424] Guard against double-close() of RecordReaders (branch-1.3 backport) This is a branch-1.3 backport of #9382, a fix for SPARK-11424. Author: Josh Rosen Closes #9423 from JoshRosen/hadoop-decompressor-pooling-fix-branch-1.3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b90e5cba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b90e5cba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b90e5cba Branch: refs/heads/branch-1.3 Commit: b90e5cba2215869fe858179bf38454c90a64e967 Parents: 0ce1485 Author: Josh Rosen Authored: Tue Nov 3 14:17:51 2015 -0800 Committer: Josh Rosen Committed: Tue Nov 3 14:17:51 2015 -0800 -- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 23 +++--- .../org/apache/spark/rdd/NewHadoopRDD.scala | 25 +--- .../org/apache/spark/util/NextIterator.scala| 4 +++- 3 files changed, 34 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b90e5cba/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 486e86c..e127621 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -254,8 +254,21 @@ class HadoopRDD[K, V]( } override def close() { -try { - reader.close() +if (reader != null) { + // Close the reader and release it. Note: it's very important that we don't close the + // reader more than once, since that exposes us to MAPREDUCE-5918 when running against + // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic + // corruption issues when reading compressed input. + try { +reader.close() + } catch { +case e: Exception => + if (!Utils.inShutdown()) { +logWarning("Exception in RecordReader.close()", e) + } + } finally { +reader = null + } if (bytesReadCallback.isDefined) { inputMetrics.updateBytesRead() } else if (split.inputSplit.value.isInstanceOf[FileSplit] || @@ -269,12 +282,6 @@ class HadoopRDD[K, V]( logWarning("Unable to get input size to set InputMetrics for task", e) } } -} catch { - case e: Exception => { -if (!Utils.inShutdown()) { - logWarning("Exception in RecordReader.close()", e) -} - } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/b90e5cba/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 7fb9484..f863778 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -128,7 +128,7 @@ class NewHadoopRDD[K, V]( configurable.setConf(conf) case _ => } - val reader = format.createRecordReader( + var reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) @@ -158,8 +158,21 @@ class NewHadoopRDD[K, V]( } private def close() { -try { - reader.close() +if (reader != null) { + // Close the reader and release it. Note: it's very important that we don't close the + // reader more than once, since that exposes us to MAPREDUCE-5918 when running against + // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic + // corruption issues when reading compressed input. + try { +reader.close() + } catch { +case e: Exception => + if (!Utils.inShutdown()) { +logWarning("Exception in RecordReader.close()", e) + } + } finally { +reader = null + } if (bytesReadCallback.isDefined) { inputMetrics.updateBytesRead() } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] || @@ -173,12 +186,6 @@ class NewHadoopRDD[K, V]( logWarning("Unable to get input size to set InputMetrics for task", e)
spark git commit: [SPARK-11477] [SQL] support create Dataset from RDD
Repository: spark Updated Branches: refs/heads/master 1d04dc95c -> f6fcb4874 [SPARK-11477] [SQL] support create Dataset from RDD Author: Wenchen Fan Closes #9434 from cloud-fan/rdd2ds and squashes the following commits: 0892d72 [Wenchen Fan] support create Dataset from RDD Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6fcb487 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6fcb487 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6fcb487 Branch: refs/heads/master Commit: f6fcb4874ce20a1daa91b7434cf9c0254a89e979 Parents: 1d04dc9 Author: Wenchen Fan Authored: Wed Nov 4 00:15:50 2015 +0100 Committer: Michael Armbrust Committed: Wed Nov 4 00:15:50 2015 +0100 -- .../src/main/scala/org/apache/spark/sql/SQLContext.scala| 9 + .../src/main/scala/org/apache/spark/sql/SQLImplicits.scala | 4 .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 7 +++ 3 files changed, 20 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f6fcb487/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 2cb9443..5ad3871 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -499,6 +499,15 @@ class SQLContext private[sql]( new Dataset[T](this, plan) } + def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = { +val enc = encoderFor[T] +val attributes = enc.schema.toAttributes +val encoded = data.map(d => enc.toRow(d)) +val plan = LogicalRDD(attributes, encoded)(self) + +new Dataset[T](this, plan) + } + /** * Creates a DataFrame from an RDD[Row]. User can specify whether the input rows should be * converted to Catalyst rows. http://git-wip-us.apache.org/repos/asf/spark/blob/f6fcb487/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index f460a86..f2904e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -48,6 +48,10 @@ abstract class SQLImplicits { implicit def newBooleanEncoder: Encoder[Boolean] = ExpressionEncoder[Boolean](flat = true) implicit def newStringEncoder: Encoder[String] = ExpressionEncoder[String](flat = true) + implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = { +DatasetHolder(_sqlContext.createDataset(rdd)) + } + implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(s)) } http://git-wip-us.apache.org/repos/asf/spark/blob/f6fcb487/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 5973fa7..3e9b621 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -34,6 +34,13 @@ class DatasetSuite extends QueryTest with SharedSQLContext { data: _*) } + test("toDS with RDD") { +val ds = sparkContext.makeRDD(Seq("a", "b", "c"), 3).toDS() +checkAnswer( + ds.mapPartitions(_ => Iterator(1)), + 1, 1, 1) + } + test("as tuple") { val data = Seq(("a", 1), ("b", 2)).toDF("a", "b") checkAnswer( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Fix typo in WebUI
Repository: spark Updated Branches: refs/heads/master f6fcb4874 -> 680b4e7bc Fix typo in WebUI Author: Jacek Laskowski Closes #9444 from jaceklaskowski/TImely-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/680b4e7b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/680b4e7b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/680b4e7b Branch: refs/heads/master Commit: 680b4e7bca935dc1569f35fa319bdfb01a12f7e0 Parents: f6fcb48 Author: Jacek Laskowski Authored: Tue Nov 3 15:26:35 2015 -0800 Committer: Reynold Xin Committed: Tue Nov 3 15:26:35 2015 -0800 -- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/680b4e7b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 712782d..51425e5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -49,7 +49,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { ("shuffle-read-time-proportion", "Shuffle Read Time"), ("executor-runtime-proportion", "Executor Computing Time"), ("shuffle-write-time-proportion", "Shuffle Write Time"), -("serialization-time-proportion", "Result Serialization TIme"), +("serialization-time-proportion", "Result Serialization Time"), ("getting-result-time-proportion", "Getting Result Time")) legendPairs.zipWithIndex.map { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Fix typo in WebUI
Repository: spark Updated Branches: refs/heads/branch-1.5 072afc6f4 -> c022c0aa4 Fix typo in WebUI Author: Jacek Laskowski Closes #9444 from jaceklaskowski/TImely-fix. (cherry picked from commit 680b4e7bca935dc1569f35fa319bdfb01a12f7e0) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c022c0aa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c022c0aa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c022c0aa Branch: refs/heads/branch-1.5 Commit: c022c0aa48537ca82f81999b36e8f5f568ec0059 Parents: 072afc6 Author: Jacek Laskowski Authored: Tue Nov 3 15:26:35 2015 -0800 Committer: Reynold Xin Committed: Tue Nov 3 15:26:46 2015 -0800 -- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c022c0aa/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 712782d..51425e5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -49,7 +49,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { ("shuffle-read-time-proportion", "Shuffle Read Time"), ("executor-runtime-proportion", "Executor Computing Time"), ("shuffle-write-time-proportion", "Shuffle Write Time"), -("serialization-time-proportion", "Result Serialization TIme"), +("serialization-time-proportion", "Result Serialization Time"), ("getting-result-time-proportion", "Getting Result Time")) legendPairs.zipWithIndex.map { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11466][CORE] Avoid mockito in multi-threaded FsHistoryProviderSuite test.
Repository: spark Updated Branches: refs/heads/master 680b4e7bc -> 53e9cee3e [SPARK-11466][CORE] Avoid mockito in multi-threaded FsHistoryProviderSuite test. The test functionality should be the same, but without using mockito; logs don't really say anything useful but I suspect it may be the cause of the flakiness, since updating mocks when multiple threads may be using it doesn't work very well. It also allows some other cleanup (= less test code in FsHistoryProvider). Author: Marcelo Vanzin Closes #9425 from vanzin/SPARK-11466. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53e9cee3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53e9cee3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53e9cee3 Branch: refs/heads/master Commit: 53e9cee3e4e845d1f875c487215c0f22503347b1 Parents: 680b4e7 Author: Marcelo Vanzin Authored: Tue Nov 3 16:26:28 2015 -0800 Committer: Marcelo Vanzin Committed: Tue Nov 3 16:26:28 2015 -0800 -- .../deploy/history/FsHistoryProvider.scala | 31 ++- .../deploy/history/FsHistoryProviderSuite.scala | 42 ++-- 2 files changed, 34 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/53e9cee3/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 24aa386..718efc4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -113,35 +113,30 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } // Conf option used for testing the initialization code. - val initThread = if (!conf.getBoolean("spark.history.testing.skipInitialize", false)) { - initialize(None) -} else { - null -} + val initThread = initialize() - private[history] def initialize(errorHandler: Option[Thread.UncaughtExceptionHandler]): Thread = { + private[history] def initialize(): Thread = { if (!isFsInSafeMode()) { startPolling() - return null + null +} else { + startSafeModeCheckThread(None) } + } + private[history] def startSafeModeCheckThread( + errorHandler: Option[Thread.UncaughtExceptionHandler]): Thread = { // Cannot probe anything while the FS is in safe mode, so spawn a new thread that will wait // for the FS to leave safe mode before enabling polling. This allows the main history server // UI to be shown (so that the user can see the HDFS status). -// -// The synchronization in the run() method is needed because of the tests; mockito can -// misbehave if the test is modifying the mocked methods while the thread is calling -// them. val initThread = new Thread(new Runnable() { override def run(): Unit = { try { - clock.synchronized { -while (isFsInSafeMode()) { - logInfo("HDFS is still in safe mode. Waiting...") - val deadline = clock.getTimeMillis() + -TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S) - clock.waitTillTime(deadline) -} + while (isFsInSafeMode()) { +logInfo("HDFS is still in safe mode. Waiting...") +val deadline = clock.getTimeMillis() + + TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S) +clock.waitTillTime(deadline) } startPolling() } catch { http://git-wip-us.apache.org/repos/asf/spark/blob/53e9cee3/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 833aab1..5cab17f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -41,7 +41,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark.{Logging, SparkConf, SparkFunSuite} import org.apache.spark.io._ import org.apache.spark.scheduler._ -import org.apache.spark.util.{JsonProtocol, ManualClock, Utils} +import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { @@ -423,22 +423,16 @@ class FsHistoryPro
spark git commit: [SPARK-11489][SQL] Only include common first order statistics in GroupedData
Repository: spark Updated Branches: refs/heads/master 53e9cee3e -> 5051262d4 [SPARK-11489][SQL] Only include common first order statistics in GroupedData We added a bunch of higher order statistics such as skewness and kurtosis to GroupedData. I don't think they are common enough to justify being listed, since users can always use the normal statistics aggregate functions. That is to say, after this change, we won't support ```scala df.groupBy("key").kurtosis("colA", "colB") ``` However, we will still support ```scala df.groupBy("key").agg(kurtosis(col("colA")), kurtosis(col("colB"))) ``` Author: Reynold Xin Closes #9446 from rxin/SPARK-11489. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5051262d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5051262d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5051262d Branch: refs/heads/master Commit: 5051262d4ca6a2c529c9b1ba86d54cce60a7af17 Parents: 53e9cee Author: Reynold Xin Authored: Tue Nov 3 16:27:56 2015 -0800 Committer: Reynold Xin Committed: Tue Nov 3 16:27:56 2015 -0800 -- python/pyspark/sql/group.py | 88 --- .../org/apache/spark/sql/GroupedData.scala | 146 --- .../apache/spark/sql/JavaDataFrameSuite.java| 1 - 3 files changed, 28 insertions(+), 207 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5051262d/python/pyspark/sql/group.py -- diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py index 946b53e..71c0bcc 100644 --- a/python/pyspark/sql/group.py +++ b/python/pyspark/sql/group.py @@ -167,94 +167,6 @@ class GroupedData(object): [Row(sum(age)=7, sum(height)=165)] """ -@df_varargs_api -@since(1.6) -def stddev(self, *cols): -"""Compute the sample standard deviation for each numeric columns for each group. - -:param cols: list of column names (string). Non-numeric columns are ignored. - ->>> df3.groupBy().stddev('age', 'height').collect() -[Row(STDDEV(age)=2.12..., STDDEV(height)=3.53...)] -""" - -@df_varargs_api -@since(1.6) -def stddev_samp(self, *cols): -"""Compute the sample standard deviation for each numeric columns for each group. - -:param cols: list of column names (string). Non-numeric columns are ignored. - ->>> df3.groupBy().stddev_samp('age', 'height').collect() -[Row(STDDEV_SAMP(age)=2.12..., STDDEV_SAMP(height)=3.53...)] -""" - -@df_varargs_api -@since(1.6) -def stddev_pop(self, *cols): -"""Compute the population standard deviation for each numeric columns for each group. - -:param cols: list of column names (string). Non-numeric columns are ignored. - ->>> df3.groupBy().stddev_pop('age', 'height').collect() -[Row(STDDEV_POP(age)=1.5, STDDEV_POP(height)=2.5)] -""" - -@df_varargs_api -@since(1.6) -def variance(self, *cols): -"""Compute the sample variance for each numeric columns for each group. - -:param cols: list of column names (string). Non-numeric columns are ignored. - ->>> df3.groupBy().variance('age', 'height').collect() -[Row(VARIANCE(age)=2.25, VARIANCE(height)=6.25)] -""" - -@df_varargs_api -@since(1.6) -def var_pop(self, *cols): -"""Compute the sample variance for each numeric columns for each group. - -:param cols: list of column names (string). Non-numeric columns are ignored. - ->>> df3.groupBy().var_pop('age', 'height').collect() -[Row(VAR_POP(age)=2.25, VAR_POP(height)=6.25)] -""" - -@df_varargs_api -@since(1.6) -def var_samp(self, *cols): -"""Compute the sample variance for each numeric columns for each group. - -:param cols: list of column names (string). Non-numeric columns are ignored. - ->>> df3.groupBy().var_samp('age', 'height').collect() -[Row(VAR_SAMP(age)=4.5, VAR_SAMP(height)=12.5)] -""" - -@df_varargs_api -@since(1.6) -def skewness(self, *cols): -"""Compute the skewness for each numeric columns for each group. - -:param cols: list of column names (string). Non-numeric columns are ignored. - ->>> df3.groupBy().skewness('age', 'height').collect() -[Row(SKEWNESS(age)=0.0, SKEWNESS(height)=0.0)] -""" - -@df_varargs_api -@since(1.6) -def kurtosis(self, *cols): -"""Compute the kurtosis for each numeric columns for each group. - -:param cols: list of column names (string). Non-numeric columns are ignored. - ->>> df3.groupBy().kurtosis('age', 'height').collect() -[Row(KURTOSIS(age)=-2.0, KU
spark git commit: [DOC] Missing link to R DataFrame API doc
Repository: spark Updated Branches: refs/heads/master 5051262d4 -> d648a4ad5 [DOC] Missing link to R DataFrame API doc Author: lewuathe Author: Lewuathe Closes #9394 from Lewuathe/missing-link-to-R-dataframe. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d648a4ad Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d648a4ad Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d648a4ad Branch: refs/heads/master Commit: d648a4ad546eb05deab1005e92b815b2cbea621b Parents: 5051262 Author: lewuathe Authored: Tue Nov 3 16:38:22 2015 -0800 Committer: Shivaram Venkataraman Committed: Tue Nov 3 16:38:22 2015 -0800 -- R/pkg/R/DataFrame.R | 105 ++--- docs/sql-programming-guide.md | 2 +- 2 files changed, 98 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d648a4ad/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 87a2c66..df5bc81 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -23,15 +23,23 @@ NULL setOldClass("jobj") #' @title S4 class that represents a DataFrame -#' @description DataFrames can be created using functions like -#' \code{jsonFile}, \code{table} etc. +#' @description DataFrames can be created using functions like \link{createDataFrame}, +#' \link{jsonFile}, \link{table} etc. +#' @family dataframe_funcs #' @rdname DataFrame -#' @seealso jsonFile, table #' @docType class #' #' @slot env An R environment that stores bookkeeping states of the DataFrame #' @slot sdf A Java object reference to the backing Scala DataFrame +#' @seealso \link{createDataFrame}, \link{jsonFile}, \link{table} +#' @seealso \url{https://spark.apache.org/docs/latest/sparkr.html#sparkr-dataframes} #' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' df <- createDataFrame(sqlContext, faithful) +#'} setClass("DataFrame", slots = list(env = "environment", sdf = "jobj")) @@ -46,7 +54,6 @@ setMethod("initialize", "DataFrame", function(.Object, sdf, isCached) { #' @rdname DataFrame #' @export -#' #' @param sdf A Java object reference to the backing Scala DataFrame #' @param isCached TRUE if the dataFrame is cached dataFrame <- function(sdf, isCached = FALSE) { @@ -61,6 +68,7 @@ dataFrame <- function(sdf, isCached = FALSE) { #' #' @param x A SparkSQL DataFrame #' +#' @family dataframe_funcs #' @rdname printSchema #' @name printSchema #' @export @@ -85,6 +93,7 @@ setMethod("printSchema", #' #' @param x A SparkSQL DataFrame #' +#' @family dataframe_funcs #' @rdname schema #' @name schema #' @export @@ -108,6 +117,7 @@ setMethod("schema", #' #' @param x A SparkSQL DataFrame #' @param extended Logical. If extended is False, explain() only prints the physical plan. +#' @family dataframe_funcs #' @rdname explain #' @name explain #' @export @@ -138,6 +148,7 @@ setMethod("explain", #' #' @param x A SparkSQL DataFrame #' +#' @family dataframe_funcs #' @rdname isLocal #' @name isLocal #' @export @@ -162,6 +173,7 @@ setMethod("isLocal", #' @param x A SparkSQL DataFrame #' @param numRows The number of rows to print. Defaults to 20. #' +#' @family dataframe_funcs #' @rdname showDF #' @name showDF #' @export @@ -186,6 +198,7 @@ setMethod("showDF", #' #' @param x A SparkSQL DataFrame #' +#' @family dataframe_funcs #' @rdname show #' @name show #' @export @@ -212,6 +225,7 @@ setMethod("show", "DataFrame", #' #' @param x A SparkSQL DataFrame #' +#' @family dataframe_funcs #' @rdname dtypes #' @name dtypes #' @export @@ -237,6 +251,7 @@ setMethod("dtypes", #' #' @param x A SparkSQL DataFrame #' +#' @family dataframe_funcs #' @rdname columns #' @name columns #' @aliases names @@ -257,6 +272,7 @@ setMethod("columns", }) }) +#' @family dataframe_funcs #' @rdname columns #' @name names setMethod("names", @@ -265,6 +281,7 @@ setMethod("names", columns(x) }) +#' @family dataframe_funcs #' @rdname columns #' @name names<- setMethod("names<-", @@ -283,6 +300,7 @@ setMethod("names<-", #' @param x A SparkSQL DataFrame #' @param tableName A character vector containing the name of the table #' +#' @family dataframe_funcs #' @rdname registerTempTable #' @name registerTempTable #' @export @@ -310,6 +328,7 @@ setMethod("registerTempTable", #' @param overwrite A logical argument indicating whether or not to overwrite #' the existing rows in the table. #' +#' @family dataframe_funcs #' @rdname insertInto #' @name insertInto #' @export @@ -334,6 +353,7 @@ setMethod("insertInto", #' #' @param x A SparkSQL DataFrame
spark git commit: [SPARK-11329] [SQL] Cleanup from spark-11329 fix.
Repository: spark Updated Branches: refs/heads/master d648a4ad5 -> e352de0db [SPARK-11329] [SQL] Cleanup from spark-11329 fix. Author: Nong Closes #9442 from nongli/spark-11483. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e352de0d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e352de0d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e352de0d Branch: refs/heads/master Commit: e352de0db2789919e1e0385b79f29b508a6b2b77 Parents: d648a4a Author: Nong Authored: Tue Nov 3 16:44:37 2015 -0800 Committer: Yin Huai Committed: Tue Nov 3 16:44:37 2015 -0800 -- .../apache/spark/sql/catalyst/SqlParser.scala | 4 +- .../sql/catalyst/analysis/unresolved.scala | 18 + .../scala/org/apache/spark/sql/Column.scala | 6 +- .../org/apache/spark/sql/SQLQuerySuite.scala| 79 4 files changed, 55 insertions(+), 52 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e352de0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 1ba559d..440e9e2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -477,8 +477,8 @@ object SqlParser extends AbstractSparkSQLParser with DataTypeParser { protected lazy val baseExpression: Parser[Expression] = ( "*" ^^^ UnresolvedStar(None) -| (ident <~ "."). + <~ "*" ^^ { case target => { UnresolvedStar(Option(target)) } -} | primary +| (ident <~ "."). + <~ "*" ^^ { case target => UnresolvedStar(Option(target))} +| primary ) protected lazy val signedPrimary: Parser[Expression] = http://git-wip-us.apache.org/repos/asf/spark/blob/e352de0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 6975662..eae17c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -183,28 +183,16 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu case None => input.output // If there is a table, pick out attributes that are part of this table. case Some(t) => if (t.size == 1) { -input.output.filter(_.qualifiers.filter(resolver(_, t.head)).nonEmpty) +input.output.filter(_.qualifiers.exists(resolver(_, t.head))) } else { List() } } -if (!expandedAttributes.isEmpty) { - if (expandedAttributes.forall(_.isInstanceOf[NamedExpression])) { -return expandedAttributes - } else { -require(expandedAttributes.size == input.output.size) -expandedAttributes.zip(input.output).map { - case (e, originalAttribute) => -Alias(e, originalAttribute.name)(qualifiers = originalAttribute.qualifiers) -} - } - return expandedAttributes -} - -require(target.isDefined) +if (expandedAttributes.nonEmpty) return expandedAttributes // Try to resolve it as a struct expansion. If there is a conflict and both are possible, // (i.e. [name].* is both a table and a struct), the struct path can always be qualified. +require(target.isDefined) val attribute = input.resolve(target.get, resolver) if (attribute.isDefined) { // This target resolved to an attribute in child. It must be a struct. Expand it. http://git-wip-us.apache.org/repos/asf/spark/blob/e352de0d/sql/core/src/main/scala/org/apache/spark/sql/Column.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 3cde9d6..c73f696 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -60,8 +60,10 @@ class Column(protected[sql] val expr: Expression) extends Logging { def this(name: String) = this(name match { case "*" => UnresolvedStar(None) -case _ if name.endsWith(".*") => UnresolvedStar(Some(UnresolvedAttribute.parseAttributeName( - name.substring(0, name.length - 2 +case _ if name.endsWith(".*") => { +
spark git commit: [SPARK-11455][SQL] fix case sensitivity of partition by
Repository: spark Updated Branches: refs/heads/master e352de0db -> 2692bdb7d [SPARK-11455][SQL] fix case sensitivity of partition by depend on `caseSensitive` to do column name equality check, instead of just `==` Author: Wenchen Fan Closes #9410 from cloud-fan/partition. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2692bdb7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2692bdb7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2692bdb7 Branch: refs/heads/master Commit: 2692bdb7dbf36d6247f595d5fd0cb9cda89e1fdd Parents: e352de0 Author: Wenchen Fan Authored: Tue Nov 3 20:25:58 2015 -0800 Committer: Yin Huai Committed: Tue Nov 3 20:25:58 2015 -0800 -- .../datasources/PartitioningUtils.scala | 7 ++--- .../datasources/ResolvedDataSource.scala| 27 +++- .../spark/sql/execution/datasources/rules.scala | 6 +++-- .../org/apache/spark/sql/DataFrameSuite.scala | 10 4 files changed, 39 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2692bdb7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 628c5e1..16dc236 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -287,10 +287,11 @@ private[sql] object PartitioningUtils { def validatePartitionColumnDataTypes( schema: StructType, - partitionColumns: Array[String]): Unit = { + partitionColumns: Array[String], + caseSensitive: Boolean): Unit = { -ResolvedDataSource.partitionColumnsSchema(schema, partitionColumns).foreach { field => - field.dataType match { +ResolvedDataSource.partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach { + field => field.dataType match { case _: AtomicType => // OK case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column") } http://git-wip-us.apache.org/repos/asf/spark/blob/2692bdb7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 54beabb..86a306b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -99,7 +99,8 @@ object ResolvedDataSource extends Logging { val maybePartitionsSchema = if (partitionColumns.isEmpty) { None } else { -Some(partitionColumnsSchema(schema, partitionColumns)) +Some(partitionColumnsSchema( + schema, partitionColumns, sqlContext.conf.caseSensitiveAnalysis)) } val caseInsensitiveOptions = new CaseInsensitiveMap(options) @@ -172,14 +173,24 @@ object ResolvedDataSource extends Logging { def partitionColumnsSchema( schema: StructType, - partitionColumns: Array[String]): StructType = { + partitionColumns: Array[String], + caseSensitive: Boolean): StructType = { +val equality = columnNameEquality(caseSensitive) StructType(partitionColumns.map { col => - schema.find(_.name == col).getOrElse { + schema.find(f => equality(f.name, col)).getOrElse { throw new RuntimeException(s"Partition column $col not found in schema $schema") } }).asNullable } + private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = { +if (caseSensitive) { + org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution +} else { + org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution +} + } + /** Create a [[ResolvedDataSource]] for saving the content of the given DataFrame. */ def apply( sqlContext: SQLContext, @@ -207,14 +218,18 @@ object ResolvedDataSource extends Logging { path.makeQualified(fs.getUri, fs.getWorkingDirectory) } -PartitioningUtils.validatePartitionColumnDataTypes(data.schema, partitionColumns) +val caseSensitive = sqlContext.conf.caseSensitiveAnalysis +PartitioningUtils.validatePar
svn commit: r1712484 - in /spark: _layouts/ images/ site/ site/graphx/ site/images/ site/mllib/ site/news/ site/releases/ site/screencasts/ site/sql/ site/streaming/
Author: pwendell Date: Wed Nov 4 07:17:45 2015 New Revision: 1712484 URL: http://svn.apache.org/viewvc?rev=1712484&view=rev Log: Adding trademark to logo Added: spark/images/spark-logo-trademark.png (with props) spark/images/spark-logo-trademark.xcf (with props) spark/site/images/spark-logo-trademark.png (with props) spark/site/images/spark-logo-trademark.xcf (with props) Modified: spark/_layouts/global.html spark/site/community.html spark/site/documentation.html spark/site/downloads.html spark/site/examples.html spark/site/faq.html spark/site/graphx/index.html spark/site/index.html spark/site/mailing-lists.html spark/site/mllib/index.html spark/site/news/amp-camp-2013-registration-ope.html spark/site/news/announcing-the-first-spark-summit.html spark/site/news/fourth-spark-screencast-published.html spark/site/news/index.html spark/site/news/nsdi-paper.html spark/site/news/one-month-to-spark-summit-2015.html spark/site/news/proposals-open-for-spark-summit-east.html spark/site/news/registration-open-for-spark-summit-east.html spark/site/news/run-spark-and-shark-on-amazon-emr.html spark/site/news/spark-0-6-1-and-0-5-2-released.html spark/site/news/spark-0-6-2-released.html spark/site/news/spark-0-7-0-released.html spark/site/news/spark-0-7-2-released.html spark/site/news/spark-0-7-3-released.html spark/site/news/spark-0-8-0-released.html spark/site/news/spark-0-8-1-released.html spark/site/news/spark-0-9-0-released.html spark/site/news/spark-0-9-1-released.html spark/site/news/spark-0-9-2-released.html spark/site/news/spark-1-0-0-released.html spark/site/news/spark-1-0-1-released.html spark/site/news/spark-1-0-2-released.html spark/site/news/spark-1-1-0-released.html spark/site/news/spark-1-1-1-released.html spark/site/news/spark-1-2-0-released.html spark/site/news/spark-1-2-1-released.html spark/site/news/spark-1-2-2-released.html spark/site/news/spark-1-3-0-released.html spark/site/news/spark-1-4-0-released.html spark/site/news/spark-1-4-1-released.html spark/site/news/spark-1-5-0-released.html spark/site/news/spark-1-5-1-released.html spark/site/news/spark-accepted-into-apache-incubator.html spark/site/news/spark-and-shark-in-the-news.html spark/site/news/spark-becomes-tlp.html spark/site/news/spark-featured-in-wired.html spark/site/news/spark-mailing-lists-moving-to-apache.html spark/site/news/spark-meetups.html spark/site/news/spark-screencasts-published.html spark/site/news/spark-summit-2013-is-a-wrap.html spark/site/news/spark-summit-2014-videos-posted.html spark/site/news/spark-summit-2015-videos-posted.html spark/site/news/spark-summit-agenda-posted.html spark/site/news/spark-summit-east-2015-videos-posted.html spark/site/news/spark-summit-east-agenda-posted.html spark/site/news/spark-summit-europe-agenda-posted.html spark/site/news/spark-summit-europe.html spark/site/news/spark-tips-from-quantifind.html spark/site/news/spark-user-survey-and-powered-by-page.html spark/site/news/spark-version-0-6-0-released.html spark/site/news/spark-wins-daytona-gray-sort-100tb-benchmark.html spark/site/news/strata-exercises-now-available-online.html spark/site/news/submit-talks-to-spark-summit-2014.html spark/site/news/submit-talks-to-spark-summit-east-2016.html spark/site/news/two-weeks-to-spark-summit-2014.html spark/site/news/video-from-first-spark-development-meetup.html spark/site/releases/spark-release-0-3.html spark/site/releases/spark-release-0-5-0.html spark/site/releases/spark-release-0-5-1.html spark/site/releases/spark-release-0-5-2.html spark/site/releases/spark-release-0-6-0.html spark/site/releases/spark-release-0-6-1.html spark/site/releases/spark-release-0-6-2.html spark/site/releases/spark-release-0-7-0.html spark/site/releases/spark-release-0-7-2.html spark/site/releases/spark-release-0-7-3.html spark/site/releases/spark-release-0-8-0.html spark/site/releases/spark-release-0-8-1.html spark/site/releases/spark-release-0-9-0.html spark/site/releases/spark-release-0-9-1.html spark/site/releases/spark-release-0-9-2.html spark/site/releases/spark-release-1-0-0.html spark/site/releases/spark-release-1-0-1.html spark/site/releases/spark-release-1-0-2.html spark/site/releases/spark-release-1-1-0.html spark/site/releases/spark-release-1-1-1.html spark/site/releases/spark-release-1-2-0.html spark/site/releases/spark-release-1-2-1.html spark/site/releases/spark-release-1-2-2.html spark/site/releases/spark-release-1-3-0.html spark/site/releases/spark-release-1-3-1.html spark/site/releases/spark-release-1-4-0.html spark/site/releases/spark-release-1-4-1.html spark/site/releases/spark-release-1-5-0.html spark/si