[GitHub] spark issue #20019: [SPARK-22361][SQL][TEST] Add unit test for Window Frames
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20019 **[Test build #86191 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86191/testReport)** for PR 20019 at commit [`d1e2454`](https://github.com/apache/spark/commit/d1e24542da403e5417a663c675d73a2f95ac77ef). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20019: [SPARK-22361][SQL][TEST] Add unit test for Window Frames
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/20019 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20019: [SPARK-22361][SQL][TEST] Add unit test for Window...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20019#discussion_r161860630 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala --- @@ -263,21 +263,60 @@ class ExpressionParserSuite extends PlanTest { "sum(product + 1) over (partition by ((product / 2) + 1) order by 2)", WindowExpression('sum.function('product + 1), WindowSpecDefinition(Seq('product / 2 + 1), Seq(Literal(2).asc), UnspecifiedFrame))) + } + + test("range/rows window function expressions") { --- End diff -- sorry for the late response. Just update the test case name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20279: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...
GitHub user brkyvz opened a pull request: https://github.com/apache/spark/pull/20279 [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 APIs ## What changes were proposed in this pull request? This PR migrates the MemoryStream to DataSourceV2 APIs. ## How was this patch tested? All existing unit tests in streaming. You can merge this pull request into a Git repository by running: $ git pull https://github.com/brkyvz/spark memv2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20279.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20279 commit 7c09b376eef6a4e6c118c78ad9459cb55e59e67f Author: Burak Yavuz Date: 2018-01-11T16:44:19Z save for so far commit 78c50f860aa13f569669f4ad77f4325d80085c8b Author: Burak Yavuz Date: 2018-01-12T18:27:49Z Save so far commit 2777b5b38596a1fb68bcf8ee928aec1a58dc372c Author: Burak Yavuz Date: 2018-01-13T01:43:03Z save so far commit 50a541b5890f328a655a7ef1fca4f8480b9a35f0 Author: Burak Yavuz Date: 2018-01-16T19:14:08Z Compiles and I think also runs correctly --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20167: [SPARK-16501] [MESOS] Allow providing Mesos princ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20167#discussion_r161860003 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -80,10 +80,27 @@ trait MesosSchedulerUtils extends Logging { } fwInfoBuilder.setHostname(Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse( conf.get(DRIVER_HOST_ADDRESS))) +conf.getOption("spark.mesos.principal.file") + .orElse(Option(conf.getenv("SPARK_MESOS_PRINCIPAL_FILE")) + .foreach { principalFile => +val file = io.Source.fromFile(principalFile) +val principal = file.getLines.next() +file.close +fwInfoBuilder.setPrincipal(principal) +credBuilder.setPrincipal(principal) + } conf.getOption("spark.mesos.principal").foreach { principal => fwInfoBuilder.setPrincipal(principal) credBuilder.setPrincipal(principal) } +conf.getOption("spark.mesos.secret.file") + .orElse(Option(conf.getenv("SPARK_MESOS_SECRET_FILE")) --- End diff -- You can't read `/proc//environ` from other users' processes. ``` $ cat /proc/1/environ cat: /proc/1/environ: Permission denied ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20257: [SPARK-23048][ML] Add OneHotEncoderEstimator docu...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/20257#discussion_r161859425 --- Diff: docs/ml-features.md --- @@ -775,35 +775,43 @@ for more details on the API. -## OneHotEncoder +## OneHotEncoder (Deprecated since 2.3.0) -[One-hot encoding](http://en.wikipedia.org/wiki/One-hot) maps a column of label indices to a column of binary vectors, with at most a single one-value. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features. +Because this existing `OneHotEncoder` is a stateless transformer, it is not usable on new data where the number of categories may differ from the training data. In order to fix this, a new `OneHotEncoderEstimator` was created that produces an `OneHotEncoderModel` when fitting. For more detail, please see the JIRA ticket (https://issues.apache.org/jira/browse/SPARK-13030). + +`OneHotEncoder` has been deprecated in 2.3.0 and will be removed in 3.0.0. Please use [OneHotEncoderEstimator](ml-features.html#onehotencoderestimator) for one-hot encoding instead. + +## OneHotEncoderEstimator + +[One-hot encoding](http://en.wikipedia.org/wiki/One-hot) maps a column of label indices to a column of binary vectors, with at most a single one-value. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features. For string type input data, it is common to encode categorical features using [StringIndexer](ml-features.html#stringindexer) first. --- End diff -- "with at most a single one-value" --> "each output binary vector include at most a single one-value" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20257: [SPARK-23048][ML] Add OneHotEncoderEstimator docu...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/20257#discussion_r161857103 --- Diff: examples/src/main/java/org/apache/spark/examples/ml/JavaOneHotEncoderEstimatorExample.java --- @@ -35,41 +34,37 @@ import org.apache.spark.sql.types.StructType; // $example off$ -public class JavaOneHotEncoderExample { +public class JavaOneHotEncoderEstimatorExample { public static void main(String[] args) { SparkSession spark = SparkSession .builder() - .appName("JavaOneHotEncoderExample") + .appName("JavaOneHotEncoderEstimatorExample") .getOrCreate(); // $example on$ +// Notice: this categorical features are usually encoded with `StringIndexer`. List data = Arrays.asList( - RowFactory.create(0, "a"), - RowFactory.create(1, "b"), - RowFactory.create(2, "c"), - RowFactory.create(3, "a"), - RowFactory.create(4, "a"), - RowFactory.create(5, "c") + RowFactory.create(0.0, 1.0), + RowFactory.create(1.0, 0.0), + RowFactory.create(2.0, 1.0), + RowFactory.create(0.0, 2.0), + RowFactory.create(0.0, 1.0), + RowFactory.create(2.0, 0.0) ); StructType schema = new StructType(new StructField[]{ - new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), - new StructField("category", DataTypes.StringType, false, Metadata.empty()) + new StructField("categoryIndex1", DataTypes.DoubleType, false, Metadata.empty()), + new StructField("categoryIndex2", DataTypes.DoubleType, false, Metadata.empty()) --- End diff -- Don't need to pass `Metadata.empty()` param, it's a default value. We'd better to make the example code simpler. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20257: [SPARK-23048][ML] Add OneHotEncoderEstimator docu...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/20257#discussion_r161854406 --- Diff: docs/ml-features.md --- @@ -775,35 +775,43 @@ for more details on the API. -## OneHotEncoder +## OneHotEncoder (Deprecated since 2.3.0) -[One-hot encoding](http://en.wikipedia.org/wiki/One-hot) maps a column of label indices to a column of binary vectors, with at most a single one-value. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features. +Because this existing `OneHotEncoder` is a stateless transformer, it is not usable on new data where the number of categories may differ from the training data. In order to fix this, a new `OneHotEncoderEstimator` was created that produces an `OneHotEncoderModel` when fitting. For more detail, please see the JIRA ticket (https://issues.apache.org/jira/browse/SPARK-13030). + +`OneHotEncoder` has been deprecated in 2.3.0 and will be removed in 3.0.0. Please use [OneHotEncoderEstimator](ml-features.html#onehotencoderestimator) for one-hot encoding instead. + +## OneHotEncoderEstimator + +[One-hot encoding](http://en.wikipedia.org/wiki/One-hot) maps a column of label indices to a column of binary vectors, with at most a single one-value. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features. For string type input data, it is common to encode categorical features using [StringIndexer](ml-features.html#stringindexer) first. + +`OneHotEncoderEstimator` can handle multi-column. By specifying multiple input columns, it returns a one-hot-encoded output vector column for each input column. + +`OneHotEncoderEstimator` supports `handleInvalid` parameter to choose how to handle invalid data during transforming data. Available options include 'keep' (invalid data presented as an extra categorical feature) and 'error' (throw an error). --- End diff -- "(... to an extra categorical number)" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19224: [SPARK-20990][SQL] Read all JSON documents in fil...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19224#discussion_r161858020 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,3 +361,78 @@ class JacksonParser( } } } + +object JacksonParser { + private[spark] def splitDocuments(input: InputStream) = new Iterator[String] { + +private implicit class JsonCharacter(char: Char) { + def isJsonObjectFinished(endToken: Option[Char]): Boolean = { +endToken match { + case None => char == '}' || char == ']' + case Some(x) => char == x +} + } +} +private var currentChar: Char = input.read().toChar +private var previousToken: Option[Char] = None +private var nextRecord = readNext + +override def hasNext: Boolean = nextRecord.isDefined + +override def next(): String = { + if (!hasNext) { +throw new NoSuchElementException("End of stream") + } + val curRecord = nextRecord.get + nextRecord = readNext + curRecord +} + +private def moveToNextChar() = { + if (!currentChar.isWhitespace) { +previousToken = Some(currentChar) + } + currentChar = input.read().toChar +} + +private def readJsonObject: Option[String] = { + val endToken = currentChar match { +case '{' => Some('}') +case '[' => Some(']') +case _ => None + } + + val sb = new StringBuilder() + sb.append(currentChar) + while (!currentChar.isJsonObjectFinished(endToken) && input.available() > 0) { +moveToNextChar() +currentChar match { + case '{' | '[' => --- End diff -- yes, but then then escapes should be taken in account, etc. etc. Then we would nearly have to rewrite the jackson library logic, which is something not desirable. This is the reason why I said that. Yes sure, I would be happy if we can find a solution together, but I think it is hard with this approach. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregation func...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19872 **[Test build #86189 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86189/testReport)** for PR 19872 at commit [`9824bbd`](https://github.com/apache/spark/commit/9824bbd6ca7c85cd493e5e7eef0db15bbaf1ad95). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20236: [SPARK-23044] Error handling for jira assignment
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20236#discussion_r161857471 --- Diff: dev/merge_spark_pr.py --- @@ -298,24 +299,36 @@ def choose_jira_assignee(issue, asf_jira): Prompt the user to choose who to assign the issue to in jira, given a list of candidates, including the original reporter and all commentors """ -reporter = issue.fields.reporter -commentors = map(lambda x: x.author, issue.fields.comment.comments) -candidates = set(commentors) -candidates.add(reporter) -candidates = list(candidates) -print("JIRA is unassigned, choose assignee") -for idx, author in enumerate(candidates): -annotations = ["Reporter"] if author == reporter else [] -if author in commentors: -annotations.append("Commentor") -print("[%d] %s (%s)" % (idx, author.displayName, ",".join(annotations))) -assignee = raw_input("Enter number of user to assign to (blank to leave unassigned):") -if assignee == "": -return None -else: -assignee = candidates[int(assignee)] -asf_jira.assign_issue(issue.key, assignee.key) -return assignee +while True: +try: +reporter = issue.fields.reporter +commentors = map(lambda x: x.author, issue.fields.comment.comments) +candidates = set(commentors) +candidates.add(reporter) +candidates = list(candidates) +print("JIRA is unassigned, choose assignee") +for idx, author in enumerate(candidates): +if author.key == "apachespark": + continue +annotations = ["Reporter"] if author == reporter else [] +if author in commentors: +annotations.append("Commentor") +print("[%d] %s (%s)" % (idx, author.displayName, ",".join(annotations))) +raw_assignee = raw_input("Enter number of user, or userid, to assign to (blank to leave unassigned):") +if raw_assignee == "": +return None +else: +try: + id = int(raw_assignee) + assignee = candidates[id] +except: + # assume its a user id, and try to assign (might fail, then we just prompt again) --- End diff -- it's --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregati...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r161856927 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -334,34 +339,51 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object Aggregation extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalAggregation( - groupingExpressions, aggregateExpressions, resultExpressions, child) => - -val (functionsWithDistinct, functionsWithoutDistinct) = - aggregateExpressions.partition(_.isDistinct) -if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) { - // This is a sanity check. We should not reach here when we have multiple distinct - // column sets. Our MultipleDistinctRewriter should take care this case. - sys.error("You hit a query analyzer bug. Please report your query to " + - "Spark user mailing list.") -} + groupingExpressions, aggExpressions, resultExpressions, child) => + +if (aggExpressions.forall(expr => expr.isInstanceOf[AggregateExpression])) { -val aggregateOperator = - if (functionsWithDistinct.isEmpty) { -aggregate.AggUtils.planAggregateWithoutDistinct( - groupingExpressions, - aggregateExpressions, - resultExpressions, - planLater(child)) - } else { -aggregate.AggUtils.planAggregateWithOneDistinct( - groupingExpressions, - functionsWithDistinct, - functionsWithoutDistinct, - resultExpressions, - planLater(child)) + val aggregateExpressions = aggExpressions.map(expr => +expr.asInstanceOf[AggregateExpression]) + + val (functionsWithDistinct, functionsWithoutDistinct) = +aggregateExpressions.partition(_.isDistinct) + if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) { +// This is a sanity check. We should not reach here when we have multiple distinct +// column sets. Our MultipleDistinctRewriter should take care this case. +sys.error("You hit a query analyzer bug. Please report your query to " + + "Spark user mailing list.") } -aggregateOperator + val aggregateOperator = +if (functionsWithDistinct.isEmpty) { + aggregate.AggUtils.planAggregateWithoutDistinct( +groupingExpressions, +aggregateExpressions, +resultExpressions, +planLater(child)) +} else { + aggregate.AggUtils.planAggregateWithOneDistinct( +groupingExpressions, +functionsWithDistinct, +functionsWithoutDistinct, +resultExpressions, +planLater(child)) +} + + aggregateOperator +} else if (aggExpressions.forall(expr => expr.isInstanceOf[PythonUDF])) { + val udfExpressions = aggExpressions.map(expr => expr.asInstanceOf[PythonUDF]) + + Seq(execution.python.AggregateInPandasExec( +groupingExpressions, +udfExpressions, +resultExpressions, +planLater(child))) +} else { + throw new IllegalArgumentException( --- End diff -- +1. Let's double check in https://github.com/apache/spark/pull/19872#discussion_r161507315 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregati...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r161856598 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -288,9 +289,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case PhysicalAggregation( namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child) => +require( --- End diff -- Yeah I was thinking about this one too. I prefer `AnalysisException` too but want to make sure. @cloud-fan what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregati...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r161856742 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -334,34 +339,51 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object Aggregation extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalAggregation( --- End diff -- Done. I changed to case matching. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregation func...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19872 **[Test build #86188 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86188/testReport)** for PR 19872 at commit [`b6b935c`](https://github.com/apache/spark/commit/b6b935cf120b229e9df4d276847312302a116b26). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregati...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r161856360 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala --- @@ -15,12 +15,31 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.python +package org.apache.spark.sql.catalyst.expressions -import org.apache.spark.api.python.PythonFunction -import org.apache.spark.sql.catalyst.expressions.{Expression, NonSQLExpression, Unevaluable, UserDefinedExpression} +import org.apache.spark.api.python.{PythonEvalType, PythonFunction} +import org.apache.spark.sql.catalyst.util.toPrettySQL import org.apache.spark.sql.types.DataType +/** + * Helper functions for PythonUDF --- End diff -- I changed to `[[PythonUDF]]`. I think Scala doc should use `[[]]` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregati...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r161856206 --- Diff: python/pyspark/sql/udf.py --- @@ -111,6 +111,10 @@ def returnType(self): and not isinstance(self._returnType_placeholder, StructType): raise ValueError("Invalid returnType: returnType must be a StructType for " "pandas_udf with function type GROUP_MAP") +elif self.evalType == PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF \ +and isinstance(self._returnType_placeholder, (StructType, ArrayType)): --- End diff -- Good catch. Yes I added check for `MapType` too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregati...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r161855872 --- Diff: python/pyspark/sql/tests.py --- @@ -4279,6 +4272,386 @@ def test_unsupported_types(self): df.groupby('id').apply(f).collect() +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyAggTests(ReusedSQLTestCase): + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))) \ +.drop('vs') \ +.withColumn('w', lit(1.0)) + +@property +def plus_one(self): +from pyspark.sql.functions import udf + +@udf('double') +def plus_one(v): +assert isinstance(v, (int, float)) +return v + 1 +return plus_one + +@property +def plus_two(self): +import pandas as pd +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.SCALAR) +def plus_two(v): +assert isinstance(v, pd.Series) +return v + 2 +return plus_two + +@property +def mean_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def mean_udf(v): +return v.mean() +return mean_udf + +@property +def sum_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def sum_udf(v): +return v.sum() +return sum_udf + +@property +def weighted_mean_udf(self): +import numpy as np +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def weighted_mean_udf(v, w): +return np.average(v, weights=w) +return weighted_mean_udf + +def test_basic(self): +from pyspark.sql.functions import col, lit, sum, mean + +df = self.data +weighted_mean_udf = self.weighted_mean_udf + +result1 = df.groupby('id').agg(weighted_mean_udf(df.v, lit(1.0))).sort('id') +expected1 = df.groupby('id').agg(mean(df.v).alias('weighted_mean_udf(v, 1.0)')).sort('id') +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +result2 = df.groupby((col('id') + 1)).agg(weighted_mean_udf(df.v, lit(1.0)))\ +.sort(df.id + 1) +expected2 = df.groupby((col('id') + 1))\ +.agg(mean(df.v).alias('weighted_mean_udf(v, 1.0)')).sort(df.id + 1) +self.assertPandasEqual(expected2.toPandas(), result2.toPandas()) + +result3 = df.groupby('id').agg(weighted_mean_udf(df.v, df.w)).sort('id') +expected3 = df.groupby('id').agg(mean(df.v).alias('weighted_mean_udf(v, w)')).sort('id') +self.assertPandasEqual(expected3.toPandas(), result3.toPandas()) + +result4 = df.groupby((col('id') + 1).alias('id'))\ +.agg(weighted_mean_udf(df.v, df.w))\ +.sort('id') +expected4 = df.groupby((col('id') + 1).alias('id'))\ +.agg(mean(df.v).alias('weighted_mean_udf(v, w)'))\ +.sort('id') +self.assertPandasEqual(expected4.toPandas(), result4.toPandas()) + +def test_array(self): +from pyspark.sql.types import ArrayType, DoubleType +from pyspark.sql.functions import pandas_udf, PandasUDFType + +with QuietTest(self.sc): +with self.assertRaisesRegexp(NotImplementedError, 'not supported'): +@pandas_udf(ArrayType(DoubleType()), PandasUDFType.GROUP_AGG) +def mean_and_std_udf(v): +return [v.mean(), v.std()] + +def test_struct(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +with QuietTest(self.sc): +with self.assertRaisesRegexp(NotImplementedError, 'not supported'): +@pandas_udf('mean double, std double', PandasUDFType.GROUP_AGG) +def mean_and_std_udf(v): +return (v.mean(), v.std()) + +def test_alias(self): +from pyspark.sql.functions import mean + +df = self.data +mean_udf = self.mean_udf +
[GitHub] spark pull request #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregati...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r161855488 --- Diff: python/pyspark/sql/tests.py --- @@ -4279,6 +4272,386 @@ def test_unsupported_types(self): df.groupby('id').apply(f).collect() +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyAggTests(ReusedSQLTestCase): + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))) \ +.drop('vs') \ +.withColumn('w', lit(1.0)) + +@property +def plus_one(self): +from pyspark.sql.functions import udf + +@udf('double') +def plus_one(v): +assert isinstance(v, (int, float)) +return v + 1 +return plus_one + +@property +def plus_two(self): +import pandas as pd +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.SCALAR) +def plus_two(v): +assert isinstance(v, pd.Series) +return v + 2 +return plus_two + +@property +def mean_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def mean_udf(v): +return v.mean() +return mean_udf + +@property +def sum_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def sum_udf(v): +return v.sum() +return sum_udf + +@property +def weighted_mean_udf(self): +import numpy as np +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def weighted_mean_udf(v, w): +return np.average(v, weights=w) +return weighted_mean_udf + +def test_basic(self): +from pyspark.sql.functions import col, lit, sum, mean + +df = self.data +weighted_mean_udf = self.weighted_mean_udf + +result1 = df.groupby('id').agg(weighted_mean_udf(df.v, lit(1.0))).sort('id') +expected1 = df.groupby('id').agg(mean(df.v).alias('weighted_mean_udf(v, 1.0)')).sort('id') +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +result2 = df.groupby((col('id') + 1)).agg(weighted_mean_udf(df.v, lit(1.0)))\ +.sort(df.id + 1) +expected2 = df.groupby((col('id') + 1))\ +.agg(mean(df.v).alias('weighted_mean_udf(v, 1.0)')).sort(df.id + 1) +self.assertPandasEqual(expected2.toPandas(), result2.toPandas()) + +result3 = df.groupby('id').agg(weighted_mean_udf(df.v, df.w)).sort('id') +expected3 = df.groupby('id').agg(mean(df.v).alias('weighted_mean_udf(v, w)')).sort('id') +self.assertPandasEqual(expected3.toPandas(), result3.toPandas()) + +result4 = df.groupby((col('id') + 1).alias('id'))\ +.agg(weighted_mean_udf(df.v, df.w))\ +.sort('id') +expected4 = df.groupby((col('id') + 1).alias('id'))\ +.agg(mean(df.v).alias('weighted_mean_udf(v, w)'))\ +.sort('id') +self.assertPandasEqual(expected4.toPandas(), result4.toPandas()) + +def test_array(self): +from pyspark.sql.types import ArrayType, DoubleType +from pyspark.sql.functions import pandas_udf, PandasUDFType + +with QuietTest(self.sc): +with self.assertRaisesRegexp(NotImplementedError, 'not supported'): +@pandas_udf(ArrayType(DoubleType()), PandasUDFType.GROUP_AGG) +def mean_and_std_udf(v): +return [v.mean(), v.std()] + +def test_struct(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +with QuietTest(self.sc): +with self.assertRaisesRegexp(NotImplementedError, 'not supported'): +@pandas_udf('mean double, std double', PandasUDFType.GROUP_AGG) +def mean_and_std_udf(v): +return (v.mean(), v.std()) + +def test_alias(self): +from pyspark.sql.functions import mean + +df = self.data +mean_udf = self.mean_udf +
[GitHub] spark pull request #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregati...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r161855220 --- Diff: python/pyspark/sql/tests.py --- @@ -4279,6 +4272,386 @@ def test_unsupported_types(self): df.groupby('id').apply(f).collect() +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyAggTests(ReusedSQLTestCase): + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))) \ +.drop('vs') \ +.withColumn('w', lit(1.0)) + +@property +def plus_one(self): +from pyspark.sql.functions import udf + +@udf('double') +def plus_one(v): +assert isinstance(v, (int, float)) +return v + 1 +return plus_one + +@property +def plus_two(self): +import pandas as pd +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.SCALAR) +def plus_two(v): +assert isinstance(v, pd.Series) +return v + 2 +return plus_two + +@property +def mean_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def mean_udf(v): +return v.mean() +return mean_udf + +@property +def sum_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def sum_udf(v): +return v.sum() +return sum_udf + +@property +def weighted_mean_udf(self): +import numpy as np +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def weighted_mean_udf(v, w): +return np.average(v, weights=w) +return weighted_mean_udf + +def test_basic(self): +from pyspark.sql.functions import col, lit, sum, mean + +df = self.data +weighted_mean_udf = self.weighted_mean_udf + +result1 = df.groupby('id').agg(weighted_mean_udf(df.v, lit(1.0))).sort('id') +expected1 = df.groupby('id').agg(mean(df.v).alias('weighted_mean_udf(v, 1.0)')).sort('id') +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +result2 = df.groupby((col('id') + 1)).agg(weighted_mean_udf(df.v, lit(1.0)))\ +.sort(df.id + 1) +expected2 = df.groupby((col('id') + 1))\ +.agg(mean(df.v).alias('weighted_mean_udf(v, 1.0)')).sort(df.id + 1) +self.assertPandasEqual(expected2.toPandas(), result2.toPandas()) + +result3 = df.groupby('id').agg(weighted_mean_udf(df.v, df.w)).sort('id') +expected3 = df.groupby('id').agg(mean(df.v).alias('weighted_mean_udf(v, w)')).sort('id') +self.assertPandasEqual(expected3.toPandas(), result3.toPandas()) + +result4 = df.groupby((col('id') + 1).alias('id'))\ +.agg(weighted_mean_udf(df.v, df.w))\ +.sort('id') +expected4 = df.groupby((col('id') + 1).alias('id'))\ +.agg(mean(df.v).alias('weighted_mean_udf(v, w)'))\ +.sort('id') +self.assertPandasEqual(expected4.toPandas(), result4.toPandas()) + +def test_array(self): +from pyspark.sql.types import ArrayType, DoubleType +from pyspark.sql.functions import pandas_udf, PandasUDFType + +with QuietTest(self.sc): +with self.assertRaisesRegexp(NotImplementedError, 'not supported'): +@pandas_udf(ArrayType(DoubleType()), PandasUDFType.GROUP_AGG) +def mean_and_std_udf(v): +return [v.mean(), v.std()] + +def test_struct(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +with QuietTest(self.sc): +with self.assertRaisesRegexp(NotImplementedError, 'not supported'): +@pandas_udf('mean double, std double', PandasUDFType.GROUP_AGG) +def mean_and_std_udf(v): +return (v.mean(), v.std()) + +def test_alias(self): +from pyspark.sql.functions import mean + +df = self.data +mean_udf = self.mean_udf +
[GitHub] spark pull request #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregati...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r161855060 --- Diff: python/pyspark/sql/tests.py --- @@ -4279,6 +4272,386 @@ def test_unsupported_types(self): df.groupby('id').apply(f).collect() +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyAggTests(ReusedSQLTestCase): + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))) \ +.drop('vs') \ +.withColumn('w', lit(1.0)) + +@property +def plus_one(self): +from pyspark.sql.functions import udf + +@udf('double') +def plus_one(v): +assert isinstance(v, (int, float)) +return v + 1 +return plus_one + +@property +def plus_two(self): +import pandas as pd +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.SCALAR) +def plus_two(v): +assert isinstance(v, pd.Series) +return v + 2 +return plus_two + +@property +def mean_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def mean_udf(v): +return v.mean() +return mean_udf + +@property +def sum_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def sum_udf(v): +return v.sum() +return sum_udf + +@property +def weighted_mean_udf(self): +import numpy as np +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def weighted_mean_udf(v, w): +return np.average(v, weights=w) +return weighted_mean_udf + +def test_basic(self): +from pyspark.sql.functions import col, lit, sum, mean + +df = self.data +weighted_mean_udf = self.weighted_mean_udf + +result1 = df.groupby('id').agg(weighted_mean_udf(df.v, lit(1.0))).sort('id') +expected1 = df.groupby('id').agg(mean(df.v).alias('weighted_mean_udf(v, 1.0)')).sort('id') +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +result2 = df.groupby((col('id') + 1)).agg(weighted_mean_udf(df.v, lit(1.0)))\ +.sort(df.id + 1) +expected2 = df.groupby((col('id') + 1))\ +.agg(mean(df.v).alias('weighted_mean_udf(v, 1.0)')).sort(df.id + 1) +self.assertPandasEqual(expected2.toPandas(), result2.toPandas()) + +result3 = df.groupby('id').agg(weighted_mean_udf(df.v, df.w)).sort('id') +expected3 = df.groupby('id').agg(mean(df.v).alias('weighted_mean_udf(v, w)')).sort('id') +self.assertPandasEqual(expected3.toPandas(), result3.toPandas()) + +result4 = df.groupby((col('id') + 1).alias('id'))\ +.agg(weighted_mean_udf(df.v, df.w))\ +.sort('id') +expected4 = df.groupby((col('id') + 1).alias('id'))\ +.agg(mean(df.v).alias('weighted_mean_udf(v, w)'))\ +.sort('id') +self.assertPandasEqual(expected4.toPandas(), result4.toPandas()) + +def test_array(self): +from pyspark.sql.types import ArrayType, DoubleType +from pyspark.sql.functions import pandas_udf, PandasUDFType + +with QuietTest(self.sc): +with self.assertRaisesRegexp(NotImplementedError, 'not supported'): +@pandas_udf(ArrayType(DoubleType()), PandasUDFType.GROUP_AGG) +def mean_and_std_udf(v): +return [v.mean(), v.std()] + +def test_struct(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +with QuietTest(self.sc): +with self.assertRaisesRegexp(NotImplementedError, 'not supported'): +@pandas_udf('mean double, std double', PandasUDFType.GROUP_AGG) +def mean_and_std_udf(v): +return (v.mean(), v.std()) + +def test_alias(self): +from pyspark.sql.functions import mean + +df = self.data +mean_udf = self.mean_udf +
[GitHub] spark pull request #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregati...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r161855128 --- Diff: python/pyspark/sql/tests.py --- @@ -4279,6 +4272,386 @@ def test_unsupported_types(self): df.groupby('id').apply(f).collect() +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyAggTests(ReusedSQLTestCase): + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))) \ +.drop('vs') \ +.withColumn('w', lit(1.0)) + +@property +def plus_one(self): +from pyspark.sql.functions import udf + +@udf('double') +def plus_one(v): +assert isinstance(v, (int, float)) +return v + 1 +return plus_one + +@property +def plus_two(self): +import pandas as pd +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.SCALAR) +def plus_two(v): +assert isinstance(v, pd.Series) +return v + 2 +return plus_two + +@property +def mean_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def mean_udf(v): +return v.mean() +return mean_udf + +@property +def sum_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def sum_udf(v): +return v.sum() +return sum_udf + +@property +def weighted_mean_udf(self): +import numpy as np +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def weighted_mean_udf(v, w): +return np.average(v, weights=w) +return weighted_mean_udf + +def test_basic(self): +from pyspark.sql.functions import col, lit, sum, mean + +df = self.data +weighted_mean_udf = self.weighted_mean_udf + +result1 = df.groupby('id').agg(weighted_mean_udf(df.v, lit(1.0))).sort('id') +expected1 = df.groupby('id').agg(mean(df.v).alias('weighted_mean_udf(v, 1.0)')).sort('id') +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +result2 = df.groupby((col('id') + 1)).agg(weighted_mean_udf(df.v, lit(1.0)))\ +.sort(df.id + 1) +expected2 = df.groupby((col('id') + 1))\ +.agg(mean(df.v).alias('weighted_mean_udf(v, 1.0)')).sort(df.id + 1) +self.assertPandasEqual(expected2.toPandas(), result2.toPandas()) + +result3 = df.groupby('id').agg(weighted_mean_udf(df.v, df.w)).sort('id') +expected3 = df.groupby('id').agg(mean(df.v).alias('weighted_mean_udf(v, w)')).sort('id') +self.assertPandasEqual(expected3.toPandas(), result3.toPandas()) + +result4 = df.groupby((col('id') + 1).alias('id'))\ +.agg(weighted_mean_udf(df.v, df.w))\ +.sort('id') +expected4 = df.groupby((col('id') + 1).alias('id'))\ +.agg(mean(df.v).alias('weighted_mean_udf(v, w)'))\ +.sort('id') +self.assertPandasEqual(expected4.toPandas(), result4.toPandas()) + +def test_array(self): +from pyspark.sql.types import ArrayType, DoubleType +from pyspark.sql.functions import pandas_udf, PandasUDFType + +with QuietTest(self.sc): +with self.assertRaisesRegexp(NotImplementedError, 'not supported'): +@pandas_udf(ArrayType(DoubleType()), PandasUDFType.GROUP_AGG) +def mean_and_std_udf(v): +return [v.mean(), v.std()] + +def test_struct(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +with QuietTest(self.sc): +with self.assertRaisesRegexp(NotImplementedError, 'not supported'): +@pandas_udf('mean double, std double', PandasUDFType.GROUP_AGG) +def mean_and_std_udf(v): +return (v.mean(), v.std()) + +def test_alias(self): +from pyspark.sql.functions import mean + +df = self.data +mean_udf = self.mean_udf +
[GitHub] spark pull request #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregati...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r161855044 --- Diff: python/pyspark/sql/tests.py --- @@ -4279,6 +4272,386 @@ def test_unsupported_types(self): df.groupby('id').apply(f).collect() +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyAggTests(ReusedSQLTestCase): + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))) \ +.drop('vs') \ +.withColumn('w', lit(1.0)) + +@property +def plus_one(self): +from pyspark.sql.functions import udf + +@udf('double') +def plus_one(v): +assert isinstance(v, (int, float)) +return v + 1 +return plus_one + +@property +def plus_two(self): +import pandas as pd +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.SCALAR) +def plus_two(v): +assert isinstance(v, pd.Series) +return v + 2 +return plus_two + +@property +def mean_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def mean_udf(v): +return v.mean() +return mean_udf + +@property +def sum_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def sum_udf(v): +return v.sum() +return sum_udf + +@property +def weighted_mean_udf(self): +import numpy as np +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def weighted_mean_udf(v, w): +return np.average(v, weights=w) +return weighted_mean_udf + +def test_basic(self): +from pyspark.sql.functions import col, lit, sum, mean + +df = self.data +weighted_mean_udf = self.weighted_mean_udf + +result1 = df.groupby('id').agg(weighted_mean_udf(df.v, lit(1.0))).sort('id') +expected1 = df.groupby('id').agg(mean(df.v).alias('weighted_mean_udf(v, 1.0)')).sort('id') +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +result2 = df.groupby((col('id') + 1)).agg(weighted_mean_udf(df.v, lit(1.0)))\ +.sort(df.id + 1) +expected2 = df.groupby((col('id') + 1))\ +.agg(mean(df.v).alias('weighted_mean_udf(v, 1.0)')).sort(df.id + 1) +self.assertPandasEqual(expected2.toPandas(), result2.toPandas()) + +result3 = df.groupby('id').agg(weighted_mean_udf(df.v, df.w)).sort('id') +expected3 = df.groupby('id').agg(mean(df.v).alias('weighted_mean_udf(v, w)')).sort('id') +self.assertPandasEqual(expected3.toPandas(), result3.toPandas()) + +result4 = df.groupby((col('id') + 1).alias('id'))\ +.agg(weighted_mean_udf(df.v, df.w))\ +.sort('id') +expected4 = df.groupby((col('id') + 1).alias('id'))\ +.agg(mean(df.v).alias('weighted_mean_udf(v, w)'))\ +.sort('id') +self.assertPandasEqual(expected4.toPandas(), result4.toPandas()) + +def test_array(self): --- End diff -- Done. Also added test case for map type. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregati...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r161854960 --- Diff: python/pyspark/sql/tests.py --- @@ -4279,6 +4272,386 @@ def test_unsupported_types(self): df.groupby('id').apply(f).collect() +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyAggTests(ReusedSQLTestCase): + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))) \ +.drop('vs') \ +.withColumn('w', lit(1.0)) + +@property +def plus_one(self): +from pyspark.sql.functions import udf + +@udf('double') +def plus_one(v): +assert isinstance(v, (int, float)) +return v + 1 +return plus_one + +@property +def plus_two(self): +import pandas as pd +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.SCALAR) +def plus_two(v): +assert isinstance(v, pd.Series) +return v + 2 +return plus_two + +@property +def mean_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def mean_udf(v): +return v.mean() +return mean_udf + +@property +def sum_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def sum_udf(v): +return v.sum() +return sum_udf + +@property +def weighted_mean_udf(self): +import numpy as np +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def weighted_mean_udf(v, w): +return np.average(v, weights=w) +return weighted_mean_udf + +def test_basic(self): +from pyspark.sql.functions import col, lit, sum, mean + +df = self.data +weighted_mean_udf = self.weighted_mean_udf + +result1 = df.groupby('id').agg(weighted_mean_udf(df.v, lit(1.0))).sort('id') --- End diff -- Sorry! Yes I added comments to each test case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregati...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r161854767 --- Diff: python/pyspark/sql/tests.py --- @@ -4279,6 +4272,386 @@ def test_unsupported_types(self): df.groupby('id').apply(f).collect() +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyAggTests(ReusedSQLTestCase): --- End diff -- Changed to `GroupbyAggPandasUDFTests` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregati...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r161854824 --- Diff: python/pyspark/sql/tests.py --- @@ -4279,6 +4272,386 @@ def test_unsupported_types(self): df.groupby('id').apply(f).collect() +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyAggTests(ReusedSQLTestCase): + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))) \ +.drop('vs') \ +.withColumn('w', lit(1.0)) + +@property +def plus_one(self): --- End diff -- Added prefix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregation func...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19872 **[Test build #86187 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86187/testReport)** for PR 19872 at commit [`9fbf012`](https://github.com/apache/spark/commit/9fbf01275159fb7b16cf11687510746d174a7e1f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: [SPARK-22274][PYTHON][SQL] User-defined aggregati...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r161854698 --- Diff: python/pyspark/sql/tests.py --- @@ -4279,6 +4272,386 @@ def test_unsupported_types(self): df.groupby('id').apply(f).collect() +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyAggTests(ReusedSQLTestCase): + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))) \ +.drop('vs') \ +.withColumn('w', lit(1.0)) + +@property +def plus_one(self): +from pyspark.sql.functions import udf + +@udf('double') +def plus_one(v): +assert isinstance(v, (int, float)) +return v + 1 +return plus_one + +@property +def plus_two(self): +import pandas as pd +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.SCALAR) +def plus_two(v): +assert isinstance(v, pd.Series) +return v + 2 +return plus_two + +@property +def mean_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def mean_udf(v): +return v.mean() +return mean_udf + +@property +def sum_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def sum_udf(v): +return v.sum() +return sum_udf + +@property +def weighted_mean_udf(self): +import numpy as np +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUP_AGG) +def weighted_mean_udf(v, w): +return np.average(v, weights=w) +return weighted_mean_udf + +def test_basic(self): +from pyspark.sql.functions import col, lit, sum, mean + +df = self.data +weighted_mean_udf = self.weighted_mean_udf + +result1 = df.groupby('id').agg(weighted_mean_udf(df.v, lit(1.0))).sort('id') +expected1 = df.groupby('id').agg(mean(df.v).alias('weighted_mean_udf(v, 1.0)')).sort('id') +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +result2 = df.groupby((col('id') + 1)).agg(weighted_mean_udf(df.v, lit(1.0)))\ +.sort(df.id + 1) +expected2 = df.groupby((col('id') + 1))\ +.agg(mean(df.v).alias('weighted_mean_udf(v, 1.0)')).sort(df.id + 1) +self.assertPandasEqual(expected2.toPandas(), result2.toPandas()) + +result3 = df.groupby('id').agg(weighted_mean_udf(df.v, df.w)).sort('id') +expected3 = df.groupby('id').agg(mean(df.v).alias('weighted_mean_udf(v, w)')).sort('id') +self.assertPandasEqual(expected3.toPandas(), result3.toPandas()) + +result4 = df.groupby((col('id') + 1).alias('id'))\ +.agg(weighted_mean_udf(df.v, df.w))\ +.sort('id') +expected4 = df.groupby((col('id') + 1).alias('id'))\ +.agg(mean(df.v).alias('weighted_mean_udf(v, w)'))\ +.sort('id') +self.assertPandasEqual(expected4.toPandas(), result4.toPandas()) + +def test_array(self): +from pyspark.sql.types import ArrayType, DoubleType +from pyspark.sql.functions import pandas_udf, PandasUDFType + +with QuietTest(self.sc): +with self.assertRaisesRegexp(NotImplementedError, 'not supported'): +@pandas_udf(ArrayType(DoubleType()), PandasUDFType.GROUP_AGG) +def mean_and_std_udf(v): +return [v.mean(), v.std()] + +def test_struct(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +with QuietTest(self.sc): +with self.assertRaisesRegexp(NotImplementedError, 'not supported'): +@pandas_udf('mean double, std double', PandasUDFType.GROUP_AGG) +def mean_and_std_udf(v): +return (v.mean(), v.std()) + +def test_alias(self): +from pyspark.sql.functions import mean + +df = self.data +mean_udf = self.mean_udf +
[GitHub] spark pull request #19224: [SPARK-20990][SQL] Read all JSON documents in fil...
Github user attilapiros commented on a diff in the pull request: https://github.com/apache/spark/pull/19224#discussion_r161854693 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,3 +361,78 @@ class JacksonParser( } } } + +object JacksonParser { + private[spark] def splitDocuments(input: InputStream) = new Iterator[String] { + +private implicit class JsonCharacter(char: Char) { + def isJsonObjectFinished(endToken: Option[Char]): Boolean = { +endToken match { + case None => char == '}' || char == ']' + case Some(x) => char == x +} + } +} +private var currentChar: Char = input.read().toChar +private var previousToken: Option[Char] = None +private var nextRecord = readNext + +override def hasNext: Boolean = nextRecord.isDefined + +override def next(): String = { + if (!hasNext) { +throw new NoSuchElementException("End of stream") + } + val curRecord = nextRecord.get + nextRecord = readNext + curRecord +} + +private def moveToNextChar() = { + if (!currentChar.isWhitespace) { +previousToken = Some(currentChar) + } + currentChar = input.read().toChar +} + +private def readJsonObject: Option[String] = { + val endToken = currentChar match { +case '{' => Some('}') +case '[' => Some(']') +case _ => None + } + + val sb = new StringBuilder() + sb.append(currentChar) + while (!currentChar.isJsonObjectFinished(endToken) && input.available() > 0) { +moveToNextChar() +currentChar match { + case '{' | '[' => --- End diff -- The problem I presented I think can ne be solved with extending your solution, but as I see you could have more problems in your mind. It would be nice to collect all the cases where the splitDocuments should work and how. Do you have time for this? Then maybe we can find the solution. I would be glad to help by reviewing and thinking together or solving parts of the problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment
Github user squito commented on the issue: https://github.com/apache/spark/pull/20236 @vanzin @jerryshao want to take another look? Now it * filters out "apache spark" * lets you enter an arbitrary id * if there's an error, just prompts again sample session: https://gist.github.com/squito/de73fbd0b9c00961377068b91283e04c --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20236 **[Test build #86186 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86186/testReport)** for PR 20236 at commit [`7151cf0`](https://github.com/apache/spark/commit/7151cf0e6ca47208393c3fbbe279c8c687df90d0). * This patch **fails Python style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20236 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86186/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20236 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20236 **[Test build #86186 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86186/testReport)** for PR 20236 at commit [`7151cf0`](https://github.com/apache/spark/commit/7151cf0e6ca47208393c3fbbe279c8c687df90d0). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20236 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86185/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20236 **[Test build #86185 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86185/testReport)** for PR 20236 at commit [`4d40b97`](https://github.com/apache/spark/commit/4d40b973003703da10565b228b98607ca2959352). * This patch **fails Python style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20236 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20236 **[Test build #86185 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86185/testReport)** for PR 20236 at commit [`4d40b97`](https://github.com/apache/spark/commit/4d40b973003703da10565b228b98607ca2959352). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20226: [SPARK-23034][SQL] Override `nodeName` for all *ScanExec...
Github user tejasapatil commented on the issue: https://github.com/apache/spark/pull/20226 The test failure does look legit to me. I have been not able to repro it on my laptop. Intellij doesn't treat it as a test case. Command-line does recognize it as test case but hits runtime failure with jar mismatch. I am using this to run the test: ``` build/mvn -Phive-thriftserver -Dtest=none -DwildcardSuites=org.apache.spark.sql.hive.thriftserver.HiveThriftBinaryServerSuite test ``` Is there special setup needed to run these tests ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20269: [SPARK-23029] [DOCS] Specifying default units of configu...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/20269 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20266: [SPARK-23072][SQL][TEST] Add a Unicode schema test for f...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20266 **[Test build #86184 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86184/testReport)** for PR 20266 at commit [`8fec65b`](https://github.com/apache/spark/commit/8fec65b163b32e7592b21b4a6c19c69352f41919). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20266: [SPARK-23072][SQL][TEST] Add a Unicode schema test for f...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20266 **[Test build #86183 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86183/testReport)** for PR 20266 at commit [`fb708b7`](https://github.com/apache/spark/commit/fb708b70fe4bb5d29ef55ace7fc0aae61e831c03). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20023: [SPARK-22036][SQL] Decimal multiplication with high prec...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20023 **[Test build #86182 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86182/testReport)** for PR 20023 at commit [`090659f`](https://github.com/apache/spark/commit/090659fe5f2471462ada0d54c0c855d9fe4aba7e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20266: [SPARK-23072][SQL][TEST] Add a Unicode schema test for f...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/20266 Anyway, Jenkins seems to be out of order now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20266: [SPARK-23072][SQL][TEST] Add a Unicode schema test for f...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/20266 Oh, I thought you mentioned the suite, @mgaido91 . Sorry! I agreee with you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20266: [SPARK-23072][SQL][TEST] Add a Unicode schema test for f...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20266 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20266: [SPARK-23072][SQL][TEST] Add a Unicode schema test for f...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20266 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86178/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20277: [SPARK-23090][SQL] polish ColumnVector
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20277 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86179/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20201: [SPARK-22389][SQL] data source v2 partitioning reporting...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20201 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20276: [SPARK-14948][SQL] disambiguate attributes in join condi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20276 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20201: [SPARK-22389][SQL] data source v2 partitioning reporting...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20201 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86174/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20270: [SPARK-23079][SQL] Fix query constraints propagation wit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20270 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86175/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20270: [SPARK-23079][SQL] Fix query constraints propagation wit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20270 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86176/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20276: [SPARK-14948][SQL] disambiguate attributes in join condi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20276 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86180/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20201: [SPARK-22389][SQL] data source v2 partitioning reporting...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20201 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86177/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20270: [SPARK-23079][SQL] Fix query constraints propagation wit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20270 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20270: [SPARK-23079][SQL] Fix query constraints propagation wit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20270 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20277: [SPARK-23090][SQL] polish ColumnVector
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20277 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20201: [SPARK-22389][SQL] data source v2 partitioning reporting...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20201 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20223: [SPARK-23020][core] Fix races in launcher code, t...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20223#discussion_r161834649 --- Diff: launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java --- @@ -48,14 +48,16 @@ public synchronized void disconnect() { @Override public synchronized void kill() { -disconnect(); -if (childProc != null) { - if (childProc.isAlive()) { -childProc.destroyForcibly(); +if (!isDisposed()) { + setState(State.KILLED); --- End diff -- I changed this because the old disconnect code, at least, might change the handle's state. It's easier to put this call first and not have to reason about whether that will happen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20270: [SPARK-23079][SQL] Fix query constraints propagation wit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20270 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20270: [SPARK-23079][SQL] Fix query constraints propagation wit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20270 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86173/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20278: [SPARK-23079][SQL]Fix query constraints propagation with...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20278 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86181/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20278: [SPARK-23079][SQL]Fix query constraints propagation with...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20278 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20277: [SPARK-23090][SQL] polish ColumnVector
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/20277 @cloud-fan did you do some benchmarks? I'd like to make sure that the abstract class to interface change does not negatively impact performance. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20266: [SPARK-23072][SQL][TEST] Add a Unicode schema test for f...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/20266 @dongjoon-hyun the test case I referred to (the one related to SPARK-22146) doesn't seem to use either of them to me. It is only about reading files with special chars. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20023: [SPARK-22036][SQL] Decimal multiplication with high prec...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/20023 @gatorsmile in the migration guide should I put this change in the section Spark 2.2 -> Spark 2.3? Will this change be in Spark 2.3? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20023: [SPARK-22036][SQL] Decimal multiplication with hi...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20023#discussion_r161829582 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala --- @@ -136,10 +137,52 @@ object DecimalType extends AbstractDataType { case DoubleType => DoubleDecimal } + private[sql] def forLiteral(literal: Literal): DecimalType = literal.value match { +case v: Short => fromBigDecimal(BigDecimal(v)) +case v: Int => fromBigDecimal(BigDecimal(v)) +case v: Long => fromBigDecimal(BigDecimal(v)) +case _ => forType(literal.dataType) --- End diff -- this problem was present before this PR. Should we fix it here? Is this fix needed? I guess that if it would have been a problem, it would already have been reported. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20023: [SPARK-22036][SQL] Decimal multiplication with hi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20023#discussion_r161828134 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala --- @@ -136,10 +137,52 @@ object DecimalType extends AbstractDataType { case DoubleType => DoubleDecimal } + private[sql] def forLiteral(literal: Literal): DecimalType = literal.value match { +case v: Short => fromBigDecimal(BigDecimal(v)) +case v: Int => fromBigDecimal(BigDecimal(v)) +case v: Long => fromBigDecimal(BigDecimal(v)) +case _ => forType(literal.dataType) --- End diff -- ``` private[sql] def forType(dataType: DataType): DecimalType = dataType match { case ByteType => ByteDecimal case ShortType => ShortDecimal case IntegerType => IntDecimal case LongType => LongDecimal case FloatType => FloatDecimal case DoubleType => DoubleDecimal } ``` This list is incomplete. Is that possible, the input literal is `Literal(null, NullType)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20023: [SPARK-22036][SQL] Decimal multiplication with hi...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20023#discussion_r161826940 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala --- @@ -243,17 +279,43 @@ object DecimalPrecision extends TypeCoercionRule { // Promote integers inside a binary expression with fixed-precision decimals to decimals, // and fixed-precision decimals in an expression with floats / doubles to doubles case b @ BinaryOperator(left, right) if left.dataType != right.dataType => - (left.dataType, right.dataType) match { -case (t: IntegralType, DecimalType.Fixed(p, s)) => - b.makeCopy(Array(Cast(left, DecimalType.forType(t)), right)) -case (DecimalType.Fixed(p, s), t: IntegralType) => - b.makeCopy(Array(left, Cast(right, DecimalType.forType(t -case (t, DecimalType.Fixed(p, s)) if isFloat(t) => - b.makeCopy(Array(left, Cast(right, DoubleType))) -case (DecimalType.Fixed(p, s), t) if isFloat(t) => - b.makeCopy(Array(Cast(left, DoubleType), right)) -case _ => - b - } + nondecimalLiteralAndDecimal(b).lift((left, right)).getOrElse( +nondecimalNonliteralAndDecimal(b).applyOrElse((left.dataType, right.dataType), + (_: (DataType, DataType)) => b)) } + + /** + * Type coercion for BinaryOperator in which one side is a non-decimal literal numeric, and the + * other side is a decimal. + */ + private def nondecimalLiteralAndDecimal( + b: BinaryOperator): PartialFunction[(Expression, Expression), Expression] = { +// Promote literal integers inside a binary expression with fixed-precision decimals to +// decimals. The precision and scale are the ones needed by the integer value. +case (l: Literal, r) if r.dataType.isInstanceOf[DecimalType] + && l.dataType.isInstanceOf[IntegralType] => + b.makeCopy(Array(Cast(l, DecimalType.forLiteral(l)), r)) --- End diff -- yes, Hive is doing so. That is the reason why I introduced the change (without it, we would have had test failures in spark hive). I will add this in the comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20023: [SPARK-22036][SQL] Decimal multiplication with hi...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20023#discussion_r161826504 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala --- @@ -58,7 +58,7 @@ object Literal { case s: Short => Literal(s, ShortType) case s: String => Literal(UTF8String.fromString(s), StringType) case b: Boolean => Literal(b, BooleanType) -case d: BigDecimal => Literal(Decimal(d), DecimalType(Math.max(d.precision, d.scale), d.scale)) +case d: BigDecimal => Literal(Decimal(d), DecimalType.fromBigDecimal(d)) --- End diff -- no, it's not. It's just code re-usage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20023: [SPARK-22036][SQL] Decimal multiplication with hi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20023#discussion_r161825917 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala --- @@ -58,7 +58,7 @@ object Literal { case s: Short => Literal(s, ShortType) case s: String => Literal(UTF8String.fromString(s), StringType) case b: Boolean => Literal(b, BooleanType) -case d: BigDecimal => Literal(Decimal(d), DecimalType(Math.max(d.precision, d.scale), d.scale)) +case d: BigDecimal => Literal(Decimal(d), DecimalType.fromBigDecimal(d)) --- End diff -- This is another behavior change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20266: [SPARK-23072][SQL][TEST] Add a Unicode schema test for f...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/20266 @mgaido91 . That suite is using SQL Syntax. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20023: [SPARK-22036][SQL] Decimal multiplication with hi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20023#discussion_r161825714 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala --- @@ -243,17 +279,43 @@ object DecimalPrecision extends TypeCoercionRule { // Promote integers inside a binary expression with fixed-precision decimals to decimals, // and fixed-precision decimals in an expression with floats / doubles to doubles case b @ BinaryOperator(left, right) if left.dataType != right.dataType => - (left.dataType, right.dataType) match { -case (t: IntegralType, DecimalType.Fixed(p, s)) => - b.makeCopy(Array(Cast(left, DecimalType.forType(t)), right)) -case (DecimalType.Fixed(p, s), t: IntegralType) => - b.makeCopy(Array(left, Cast(right, DecimalType.forType(t -case (t, DecimalType.Fixed(p, s)) if isFloat(t) => - b.makeCopy(Array(left, Cast(right, DoubleType))) -case (DecimalType.Fixed(p, s), t) if isFloat(t) => - b.makeCopy(Array(Cast(left, DoubleType), right)) -case _ => - b - } + nondecimalLiteralAndDecimal(b).lift((left, right)).getOrElse( +nondecimalNonliteralAndDecimal(b).applyOrElse((left.dataType, right.dataType), + (_: (DataType, DataType)) => b)) } + + /** + * Type coercion for BinaryOperator in which one side is a non-decimal literal numeric, and the + * other side is a decimal. + */ + private def nondecimalLiteralAndDecimal( + b: BinaryOperator): PartialFunction[(Expression, Expression), Expression] = { +// Promote literal integers inside a binary expression with fixed-precision decimals to +// decimals. The precision and scale are the ones needed by the integer value. +case (l: Literal, r) if r.dataType.isInstanceOf[DecimalType] + && l.dataType.isInstanceOf[IntegralType] => + b.makeCopy(Array(Cast(l, DecimalType.forLiteral(l)), r)) --- End diff -- Hive is also doing this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20278: [SPARK-23079][SQL]Fix query constraints propagation with...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20278 **[Test build #86181 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86181/testReport)** for PR 20278 at commit [`c02d9b4`](https://github.com/apache/spark/commit/c02d9b4bdccafcdf4008dcdd4c2ad9509c9acd96). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20023: [SPARK-22036][SQL] Decimal multiplication with hi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20023#discussion_r161824857 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala --- @@ -243,17 +279,43 @@ object DecimalPrecision extends TypeCoercionRule { // Promote integers inside a binary expression with fixed-precision decimals to decimals, // and fixed-precision decimals in an expression with floats / doubles to doubles case b @ BinaryOperator(left, right) if left.dataType != right.dataType => - (left.dataType, right.dataType) match { -case (t: IntegralType, DecimalType.Fixed(p, s)) => - b.makeCopy(Array(Cast(left, DecimalType.forType(t)), right)) -case (DecimalType.Fixed(p, s), t: IntegralType) => - b.makeCopy(Array(left, Cast(right, DecimalType.forType(t -case (t, DecimalType.Fixed(p, s)) if isFloat(t) => - b.makeCopy(Array(left, Cast(right, DoubleType))) -case (DecimalType.Fixed(p, s), t) if isFloat(t) => - b.makeCopy(Array(Cast(left, DoubleType), right)) -case _ => - b - } + nondecimalLiteralAndDecimal(b).lift((left, right)).getOrElse( +nondecimalNonliteralAndDecimal(b).applyOrElse((left.dataType, right.dataType), + (_: (DataType, DataType)) => b)) } + + /** + * Type coercion for BinaryOperator in which one side is a non-decimal literal numeric, and the + * other side is a decimal. + */ + private def nondecimalLiteralAndDecimal( + b: BinaryOperator): PartialFunction[(Expression, Expression), Expression] = { +// Promote literal integers inside a binary expression with fixed-precision decimals to +// decimals. The precision and scale are the ones needed by the integer value. +case (l: Literal, r) if r.dataType.isInstanceOf[DecimalType] + && l.dataType.isInstanceOf[IntegralType] => + b.makeCopy(Array(Cast(l, DecimalType.forLiteral(l)), r)) --- End diff -- Add this example as the code comment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20278: [SPARK-23079][SQL]Fix query constraints propagati...
GitHub user gengliangwang opened a pull request: https://github.com/apache/spark/pull/20278 [SPARK-23079][SQL]Fix query constraints propagation with aliases ## What changes were proposed in this pull request? Previously, PR #19201 fix the problem of non-converging constraints. After that PR #19149 improve the loop and constraints is inferred only once. So the problem of non-converging constraints is gone. However, the case below will fail. ``` spark.range(5).write.saveAsTable("t") val t = spark.read.table("t") val left = t.withColumn("xid", $"id" + lit(1)).as("x") val right = t.withColumnRenamed("id", "xid").as("y") val df = left.join(right, "xid").filter("id = 3").toDF() checkAnswer(df, Row(4, 3)) ``` Because `aliasMap` replace all the aliased child. See the test case in PR for details. This PR is to fix this bug by removing useless code for preventing non-converging constraints. It can be also fixed with #20270, but this is much simpler and clean up the code. ## How was this patch tested? Unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/gengliangwang/spark FixConstraintSimple Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20278.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20278 commit c02d9b4bdccafcdf4008dcdd4c2ad9509c9acd96 Author: Wang Gengliang Date: 2018-01-16T17:05:28Z Simple fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20023: [SPARK-22036][SQL] Decimal multiplication with high prec...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/20023 Update the PR description to explain the related difference between Hive and MS SQL Server and also document the solution this PR uses. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20276: [SPARK-14948][SQL] disambiguate attributes in join condi...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20276 **[Test build #86180 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86180/testReport)** for PR 20276 at commit [`ca31ec5`](https://github.com/apache/spark/commit/ca31ec5c9e5a8cb19827cf1b37bbdc4121296faf). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20023: [SPARK-22036][SQL] Decimal multiplication with high prec...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/20023 Since this introduces a behavior change, please update the [migration guide of Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html#migration-guide) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20023: [SPARK-22036][SQL] Decimal multiplication with hi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20023#discussion_r161820321 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala --- @@ -93,41 +97,76 @@ object DecimalPrecision extends TypeCoercionRule { case e: BinaryArithmetic if e.left.isInstanceOf[PromotePrecision] => e case Add(e1 @ DecimalType.Expression(p1, s1), e2 @ DecimalType.Expression(p2, s2)) => - val dt = DecimalType.bounded(max(s1, s2) + max(p1 - s1, p2 - s2) + 1, max(s1, s2)) - CheckOverflow(Add(promotePrecision(e1, dt), promotePrecision(e2, dt)), dt) + val resultScale = max(s1, s2) + val resultType = if (SQLConf.get.decimalOperationsAllowPrecisionLoss) { +DecimalType.adjustPrecisionScale(max(p1 - s1, p2 - s2) + resultScale + 1, --- End diff -- We need to make a decision. You know, we try our best to keep our rule unchanged. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20023: [SPARK-22036][SQL] Decimal multiplication with hi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20023#discussion_r161819811 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala --- @@ -136,10 +137,52 @@ object DecimalType extends AbstractDataType { case DoubleType => DoubleDecimal } + private[sql] def forLiteral(literal: Literal): DecimalType = literal.value match { +case v: Short => fromBigDecimal(BigDecimal(v)) +case v: Int => fromBigDecimal(BigDecimal(v)) +case v: Long => fromBigDecimal(BigDecimal(v)) +case _ => forType(literal.dataType) + } + + private[sql] def fromBigDecimal(d: BigDecimal): DecimalType = { +DecimalType(Math.max(d.precision, d.scale), d.scale) + } + private[sql] def bounded(precision: Int, scale: Int): DecimalType = { DecimalType(min(precision, MAX_PRECISION), min(scale, MAX_SCALE)) } + /** + * Scale adjustment implementation is based on Hive's one, which is itself inspired to + * SQLServer's one. In particular, when a result precision is greater than + * {@link #MAX_PRECISION}, the corresponding scale is reduced to prevent the integral part of a + * result from being truncated. + * + * This method is used only when `spark.sql.decimalOperations.allowPrecisionLoss` is set to true. + * + * @param precision + * @param scale + * @return + */ + private[sql] def adjustPrecisionScale(precision: Int, scale: Int): DecimalType = { --- End diff -- Yeah, this part is consistent. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20023: [SPARK-22036][SQL] Decimal multiplication with hi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20023#discussion_r161818562 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala --- @@ -243,17 +279,43 @@ object DecimalPrecision extends TypeCoercionRule { // Promote integers inside a binary expression with fixed-precision decimals to decimals, // and fixed-precision decimals in an expression with floats / doubles to doubles case b @ BinaryOperator(left, right) if left.dataType != right.dataType => - (left.dataType, right.dataType) match { -case (t: IntegralType, DecimalType.Fixed(p, s)) => --- End diff -- we can do ``` (left, right) match { case (l: Literal, r) => ... case (DecimalType.Expression(p, s), r @ IntergralType()) => ... } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20277: [SPARK-23090][SQL] polish ColumnVector
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20277 **[Test build #86179 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86179/testReport)** for PR 20277 at commit [`ba7ca0f`](https://github.com/apache/spark/commit/ba7ca0f168b35e381a9fd53ca59dd39d9dbd5920). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20277: [SPARK-23090][SQL] polish ColumnVector
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20277#discussion_r161816826 --- Diff: sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java --- @@ -28,21 +28,14 @@ /** * A column vector backed by Apache Arrow. */ -public final class ArrowColumnVector extends ColumnVector { +public final class ArrowColumnVector implements ColumnVector { private final ArrowVectorAccessor accessor; private ArrowColumnVector[] childColumns; - private void ensureAccessible(int index) { --- End diff -- `ColumnVector` is a performance critical place, we don't need index checking here, like other column vector implementations. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20277: [SPARK-23090][SQL] polish ColumnVector
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20277 cc @hvanhovell @kiszk @viirya @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20277: [SPARK-23090][SQL] polish ColumnVector
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/20277 [SPARK-23090][SQL] polish ColumnVector ## What changes were proposed in this pull request? Several improvements: * make ColumnVector an interface instead of abstract class * provide a default implementation for the batch get methods * remove `arrayData`, as it's always the first child column vector * rename `getChildColumn` to `getChild`, which is more concise ## How was this patch tested? existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark column-vector Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20277.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20277 commit ba7ca0f168b35e381a9fd53ca59dd39d9dbd5920 Author: Wenchen Fan Date: 2018-01-16T16:38:28Z polish ColumnVector --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20023: [SPARK-22036][SQL] Decimal multiplication with hi...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20023#discussion_r161815644 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala --- @@ -243,17 +279,43 @@ object DecimalPrecision extends TypeCoercionRule { // Promote integers inside a binary expression with fixed-precision decimals to decimals, // and fixed-precision decimals in an expression with floats / doubles to doubles case b @ BinaryOperator(left, right) if left.dataType != right.dataType => - (left.dataType, right.dataType) match { -case (t: IntegralType, DecimalType.Fixed(p, s)) => --- End diff -- unfortunately this is not really feasible since we match on different thigs: here we match on `left.dataType` and `right.dataType`, while for literals we match on `left` and `right` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20269: [SPARK-23029] [DOCS] Specifying default units of configu...
Github user ferdonline commented on the issue: https://github.com/apache/spark/pull/20269 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19224: [SPARK-20990][SQL] Read all JSON documents in fil...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19224#discussion_r161811468 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,3 +361,78 @@ class JacksonParser( } } } + +object JacksonParser { + private[spark] def splitDocuments(input: InputStream) = new Iterator[String] { + +private implicit class JsonCharacter(char: Char) { + def isJsonObjectFinished(endToken: Option[Char]): Boolean = { +endToken match { + case None => char == '}' || char == ']' + case Some(x) => char == x +} + } +} +private var currentChar: Char = input.read().toChar +private var previousToken: Option[Char] = None +private var nextRecord = readNext + +override def hasNext: Boolean = nextRecord.isDefined + +override def next(): String = { + if (!hasNext) { +throw new NoSuchElementException("End of stream") + } + val curRecord = nextRecord.get + nextRecord = readNext + curRecord +} + +private def moveToNextChar() = { + if (!currentChar.isWhitespace) { +previousToken = Some(currentChar) + } + currentChar = input.read().toChar +} + +private def readJsonObject: Option[String] = { + val endToken = currentChar match { +case '{' => Some('}') +case '[' => Some(']') +case _ => None + } + + val sb = new StringBuilder() + sb.append(currentChar) + while (!currentChar.isJsonObjectFinished(endToken) && input.available() > 0) { +moveToNextChar() +currentChar match { + case '{' | '[' => --- End diff -- the main issue is that I should subclass the Jackson parsers to make it working with the current approach, since when a JSON is not valid Jackson doesn't allow to get the string which caused the problem. And we are using many. Thus it's gonna be a very big effort. If you have better ideas, please feel free to submit a PR. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20019: [SPARK-22361][SQL][TEST] Add unit test for Window...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20019#discussion_r161810800 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala --- @@ -263,21 +263,60 @@ class ExpressionParserSuite extends PlanTest { "sum(product + 1) over (partition by ((product / 2) + 1) order by 2)", WindowExpression('sum.function('product + 1), WindowSpecDefinition(Seq('product / 2 + 1), Seq(Literal(2).asc), UnspecifiedFrame))) + } + + test("range/rows window function expressions") { --- End diff -- @jiangxb1987 gentle ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20259: [SPARK-23066][WEB-UI] Master Page increase master...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20259#discussion_r161808204 --- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala --- @@ -125,6 +125,8 @@ private[deploy] class Master( private var restServer: Option[StandaloneRestServer] = None private var restServerBoundPort: Option[Int] = None + var startupTime: Long = 0 --- End diff -- `private` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20259: [SPARK-23066][WEB-UI] Master Page increase master...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20259#discussion_r161809265 --- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala --- @@ -179,6 +181,7 @@ private[deploy] class Master( } persistenceEngine = persistenceEngine_ leaderElectionAgent = leaderElectionAgent_ +startupTime = System.currentTimeMillis() --- End diff -- This does not compute the time taken to start up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20269: [SPARK-23029] [DOCS] Specifying default units of configu...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20269 **[Test build #4054 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4054/testReport)** for PR 20269 at commit [`9d92235`](https://github.com/apache/spark/commit/9d9223570f399f2eb3ebc37e886f2cfbcad0b68d). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20216: [SPARK-23024][WEB-UI]Spark ui about the contents of the ...
Github user srowen commented on the issue: https://github.com/apache/spark/pull/20216 This basically just makes the Workers / Apps lists collapsible in the same way as other blocks? that sounds fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19224: [SPARK-20990][SQL] Read all JSON documents in fil...
Github user attilapiros commented on a diff in the pull request: https://github.com/apache/spark/pull/19224#discussion_r161807543 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,3 +361,78 @@ class JacksonParser( } } } + +object JacksonParser { + private[spark] def splitDocuments(input: InputStream) = new Iterator[String] { + +private implicit class JsonCharacter(char: Char) { + def isJsonObjectFinished(endToken: Option[Char]): Boolean = { +endToken match { + case None => char == '}' || char == ']' + case Some(x) => char == x +} + } +} +private var currentChar: Char = input.read().toChar +private var previousToken: Option[Char] = None +private var nextRecord = readNext + +override def hasNext: Boolean = nextRecord.isDefined + +override def next(): String = { + if (!hasNext) { +throw new NoSuchElementException("End of stream") + } + val curRecord = nextRecord.get + nextRecord = readNext + curRecord +} + +private def moveToNextChar() = { + if (!currentChar.isWhitespace) { +previousToken = Some(currentChar) + } + currentChar = input.read().toChar +} + +private def readJsonObject: Option[String] = { + val endToken = currentChar match { +case '{' => Some('}') +case '[' => Some(']') +case _ => None + } + + val sb = new StringBuilder() + sb.append(currentChar) + while (!currentChar.isJsonObjectFinished(endToken) && input.available() > 0) { +moveToNextChar() +currentChar match { + case '{' | '[' => --- End diff -- It is quite sad as you probably had a lot of work in it. I hope you can reuse some parts in the next try. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20216: [SPARK-23024][WEB-UI]Spark ui about the contents of the ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20216 **[Test build #4055 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4055/testReport)** for PR 20216 at commit [`4b6cdbb`](https://github.com/apache/spark/commit/4b6cdbb23d308f0418468cfd3740e4058926570e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20269: [SPARK-23029] [DOCS] Specifying default units of configu...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20269 **[Test build #4054 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4054/testReport)** for PR 20269 at commit [`9d92235`](https://github.com/apache/spark/commit/9d9223570f399f2eb3ebc37e886f2cfbcad0b68d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20266: [SPARK-23072][SQL][TEST] Add a Unicode schema test for f...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20266 **[Test build #86178 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86178/testReport)** for PR 20266 at commit [`5afaa28`](https://github.com/apache/spark/commit/5afaa2836133cfc18a52de38d666817991d62c5d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20266: [SPARK-23072][SQL][TEST] Add a Unicode schema test for f...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/20266 Retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org