[GitHub] spark pull request #14414: [SPARK-16809] enable history server links in disp...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14414#discussion_r73032912 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -152,8 +152,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.sparkUser, sc.appName, sc.conf, - sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)) + sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)), --- End diff -- Mind introducing a constant for `spark.mesos.driver.webui.url`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14414: [SPARK-16809] enable history server links in disp...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14414#discussion_r73032820 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -364,7 +377,12 @@ private[spark] class MesosClusterScheduler( val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts) val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") - driverEnv ++ executorEnv ++ desc.command.environment + var commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")( +v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}" + ) + + --- End diff -- Remove the new line? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14414: [SPARK-16809] enable history server links in disp...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14414#discussion_r73032511 --- Diff: core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala --- @@ -28,10 +28,17 @@ import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState import org.apache.spark.ui.{UIUtils, WebUIPage} private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") { + private val historyServerURL = parent.conf.getOption("spark.mesos.dispatcher.historyServer.url") + def render(request: HttpServletRequest): Seq[Node] = { val state = parent.scheduler.getSchedulerState() -val queuedHeaders = Seq("Driver ID", "Submit Date", "Main Class", "Driver Resources") -val driverHeaders = queuedHeaders ++ + +val driverHeader = Seq("Driver ID") --- End diff -- Why is a header a Seq?! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14414: [SPARK-16809] enable history server links in disp...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14414#discussion_r73032355 --- Diff: core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala --- @@ -28,10 +28,17 @@ import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState import org.apache.spark.ui.{UIUtils, WebUIPage} private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") { + private val historyServerURL = parent.conf.getOption("spark.mesos.dispatcher.historyServer.url") + def render(request: HttpServletRequest): Seq[Node] = { val state = parent.scheduler.getSchedulerState() -val queuedHeaders = Seq("Driver ID", "Submit Date", "Main Class", "Driver Resources") -val driverHeaders = queuedHeaders ++ + +val driverHeader = Seq("Driver ID") +val historyHeader = if (historyServerURL.isDefined) Seq("History") else Nil --- End diff -- `map.getOrElse`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14414: [SPARK-16809] enable history server links in disp...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14414#discussion_r73032190 --- Diff: core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala --- @@ -28,10 +28,17 @@ import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState import org.apache.spark.ui.{UIUtils, WebUIPage} private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") { + private val historyServerURL = parent.conf.getOption("spark.mesos.dispatcher.historyServer.url") --- End diff -- Would it be possible to define a constant for "spark.mesos.dispatcher.historyServer.url" (as is in SQL and YARN modules)? See https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14292#discussion_r72148282 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala --- @@ -247,6 +248,46 @@ private[sql] trait SQLTestUtils } } } + + /** Run a test on a separate [[UninterruptibleThread]]. */ + protected def testWithUninterruptibleThread(name: String, quietly: Boolean = false) +(body: => Unit): Unit = { +val timeoutMillis = 1 +var ex: Throwable = null + +def runOnThread(): Unit = { + val thread = new UninterruptibleThread(s"Testing thread for test $name") { +override def run(): Unit = { + try { +body + } catch { +case NonFatal(e) => + ex = e + } +} + } + thread.setDaemon(true) + thread.start() + thread.join(timeoutMillis) + if (thread.isAlive) { +thread.interrupt() +// If this interrupt does not work, then this thread is most likely running something that +// is not interruptible. There is not much point to wait for the thread to termniate, and +// we rather let the JVM terminate the thread on exit. +fail( + s"Test '$name' running on o.a.s.util.UninterruptibleThread timed out after" + +s" $timeoutMillis ms") + } else if (ex != null) { +throw ex + } +} + +if (quietly) { --- End diff -- What about this? ``` val f = if (quietly) testQuietly(name) else test(name) f(runOnThread()) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14292#discussion_r72148009 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -269,19 +273,11 @@ class StreamExecution( * batchId counter is incremented and a new log entry is written with the newest offsets. */ private def constructNextBatch(): Unit = { -// There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). -// If we interrupt some thread running Shell.runCommand, we may hit this issue. -// As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand" -// to set the file permission, we should not interrupt "microBatchThread" when running this -// method. See SPARK-14131. -// // Check to see what new data is available. val hasNewData = { awaitBatchLock.lock() try { -val newData = microBatchThread.runUninterruptibly { - uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) -} +val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) --- End diff -- Gave it a longer thought. I'm not using for comprehension very often, but when I do...What do you think about this? ``` val newData = for { source <- uniqueSources offset <- source.getOffset } yield (source, offset) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14292#discussion_r72137376 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -269,19 +273,11 @@ class StreamExecution( * batchId counter is incremented and a new log entry is written with the newest offsets. */ private def constructNextBatch(): Unit = { -// There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). -// If we interrupt some thread running Shell.runCommand, we may hit this issue. -// As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand" -// to set the file permission, we should not interrupt "microBatchThread" when running this -// method. See SPARK-14131. -// // Check to see what new data is available. val hasNewData = { awaitBatchLock.lock() try { -val newData = microBatchThread.runUninterruptibly { - uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) -} +val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) --- End diff -- I don't like it either, but...couldn't it be that the line is trying to do more than it really should? Perhaps the code should be two simpler functions composed? Just a wild thought...Don't wanna hold it back. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14322: [SPARK-16689] [SQL] FileSourceStrategy: Pruning P...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14322#discussion_r71992661 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala --- @@ -135,9 +135,17 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"), INPUT_PATHS -> fsRelation.location.paths.mkString(", ")) + // If the required attributes does not have the partitioning columns, we do not need + // to scan the partitioning columns. If partitioning columns are selected, the column order + // of partitionColumns is fixed in rdd. Thus, we always scan all the partitioning columns. + val scannedColumns = if (requiredAttributes.intersect(partitionSet).nonEmpty) { --- End diff -- What do you think about `map` and `getOrElse`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14326#discussion_r71992598 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala --- @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.regression + +import scala.collection.mutable + +import breeze.linalg.{DenseVector => BDV} +import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, LBFGSB => BreezeLBFGSB} + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Since +import org.apache.spark.internal.Logging +import org.apache.spark.ml.PredictorParams +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{Vector, Vectors} +import org.apache.spark.ml.linalg.BLAS._ +import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.storage.StorageLevel + +/** + * Params for robust regression. + */ +private[regression] trait RobustRegressionParams extends PredictorParams with HasRegParam + with HasMaxIter with HasTol with HasFitIntercept with HasStandardization with HasWeightCol { + + /** + * The shape parameter to control the amount of robustness. Must be > 1.0. + * At larger values of M, the huber criterion becomes more similar to least squares regression; + * for small values of M, the criterion is more similar to L1 regression. + * Default is 1.35 to get as much robustness as possible while retaining + * 95% statistical efficiency for normally distributed data. + */ + @Since("2.1.0") + final val m = new DoubleParam(this, "m", "The shape parameter to control the amount of " + +"robustness. Must be > 1.0.", ParamValidators.gt(1.0)) + + /** @group getParam */ + @Since("2.1.0") + def getM: Double = $(m) +} + +/** + * Robust regression. + * + * The learning objective is to minimize the huber loss, with regularization. + * + * The robust regression optimizes the squared loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}} + * and the absolute loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}}, + * where \beta and \sigma are parameters to be optimized. + * + * This supports two types of regularization: None and L2. + * + * This estimator is different from the R implementation of Robust Regression + * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R implementation does a + * weighted least squares implementation with weights given to each sample on the basis + * of how much the residual is greater than a certain threshold. + */ +@Since("2.1.0") +class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: String) + extends Regressor[Vector, RobustRegression, RobustRegressionModel] + with RobustRegressionParams with Logging { + + @Since("2.1.0") + def this() = this(Identifiable.randomUID("robReg")) + + /** + * Sets the value of param [[m]]. + * Default is 1.35. + * @group setParam + */ + @Since("2.1.0") + def setM(value: Double): this.type = set(m, value) + setDefault(m -> 1.35) + + /** + * Sets the regularization parameter. + * Default is 0.0. + * @group setParam + */ + @Since("2.1.0") + def setRegParam(value: Double): this.type = set(regParam,
[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14326#discussion_r71992548 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala --- @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.regression + +import scala.collection.mutable + +import breeze.linalg.{DenseVector => BDV} +import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, LBFGSB => BreezeLBFGSB} + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Since +import org.apache.spark.internal.Logging +import org.apache.spark.ml.PredictorParams +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{Vector, Vectors} +import org.apache.spark.ml.linalg.BLAS._ +import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.storage.StorageLevel + +/** + * Params for robust regression. + */ +private[regression] trait RobustRegressionParams extends PredictorParams with HasRegParam + with HasMaxIter with HasTol with HasFitIntercept with HasStandardization with HasWeightCol { + + /** + * The shape parameter to control the amount of robustness. Must be > 1.0. + * At larger values of M, the huber criterion becomes more similar to least squares regression; + * for small values of M, the criterion is more similar to L1 regression. + * Default is 1.35 to get as much robustness as possible while retaining + * 95% statistical efficiency for normally distributed data. + */ + @Since("2.1.0") + final val m = new DoubleParam(this, "m", "The shape parameter to control the amount of " + +"robustness. Must be > 1.0.", ParamValidators.gt(1.0)) + + /** @group getParam */ + @Since("2.1.0") + def getM: Double = $(m) +} + +/** + * Robust regression. + * + * The learning objective is to minimize the huber loss, with regularization. + * + * The robust regression optimizes the squared loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}} + * and the absolute loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}}, + * where \beta and \sigma are parameters to be optimized. + * + * This supports two types of regularization: None and L2. + * + * This estimator is different from the R implementation of Robust Regression + * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R implementation does a + * weighted least squares implementation with weights given to each sample on the basis + * of how much the residual is greater than a certain threshold. + */ +@Since("2.1.0") +class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: String) + extends Regressor[Vector, RobustRegression, RobustRegressionModel] + with RobustRegressionParams with Logging { + + @Since("2.1.0") + def this() = this(Identifiable.randomUID("robReg")) + + /** + * Sets the value of param [[m]]. + * Default is 1.35. + * @group setParam + */ + @Since("2.1.0") + def setM(value: Double): this.type = set(m, value) + setDefault(m -> 1.35) + + /** + * Sets the regularization parameter. + * Default is 0.0. + * @group setParam + */ + @Since("2.1.0") + def setRegParam(value: Double): this.type = set(regParam,
[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14326#discussion_r71992524 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala --- @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.regression + +import scala.collection.mutable + +import breeze.linalg.{DenseVector => BDV} +import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, LBFGSB => BreezeLBFGSB} + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Since +import org.apache.spark.internal.Logging +import org.apache.spark.ml.PredictorParams +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{Vector, Vectors} +import org.apache.spark.ml.linalg.BLAS._ +import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.storage.StorageLevel + +/** + * Params for robust regression. + */ +private[regression] trait RobustRegressionParams extends PredictorParams with HasRegParam + with HasMaxIter with HasTol with HasFitIntercept with HasStandardization with HasWeightCol { + + /** + * The shape parameter to control the amount of robustness. Must be > 1.0. + * At larger values of M, the huber criterion becomes more similar to least squares regression; + * for small values of M, the criterion is more similar to L1 regression. + * Default is 1.35 to get as much robustness as possible while retaining + * 95% statistical efficiency for normally distributed data. + */ + @Since("2.1.0") + final val m = new DoubleParam(this, "m", "The shape parameter to control the amount of " + +"robustness. Must be > 1.0.", ParamValidators.gt(1.0)) + + /** @group getParam */ + @Since("2.1.0") + def getM: Double = $(m) +} + +/** + * Robust regression. + * + * The learning objective is to minimize the huber loss, with regularization. + * + * The robust regression optimizes the squared loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}} + * and the absolute loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}}, + * where \beta and \sigma are parameters to be optimized. + * + * This supports two types of regularization: None and L2. + * + * This estimator is different from the R implementation of Robust Regression + * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R implementation does a + * weighted least squares implementation with weights given to each sample on the basis + * of how much the residual is greater than a certain threshold. + */ +@Since("2.1.0") +class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: String) + extends Regressor[Vector, RobustRegression, RobustRegressionModel] + with RobustRegressionParams with Logging { + + @Since("2.1.0") + def this() = this(Identifiable.randomUID("robReg")) + + /** + * Sets the value of param [[m]]. + * Default is 1.35. + * @group setParam + */ + @Since("2.1.0") + def setM(value: Double): this.type = set(m, value) + setDefault(m -> 1.35) + + /** + * Sets the regularization parameter. + * Default is 0.0. + * @group setParam + */ + @Since("2.1.0") + def setRegParam(value: Double): this.type = set(regParam,
[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14326#discussion_r71992526 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala --- @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.regression + +import scala.collection.mutable + +import breeze.linalg.{DenseVector => BDV} +import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, LBFGSB => BreezeLBFGSB} + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Since +import org.apache.spark.internal.Logging +import org.apache.spark.ml.PredictorParams +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{Vector, Vectors} +import org.apache.spark.ml.linalg.BLAS._ +import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.storage.StorageLevel + +/** + * Params for robust regression. + */ +private[regression] trait RobustRegressionParams extends PredictorParams with HasRegParam + with HasMaxIter with HasTol with HasFitIntercept with HasStandardization with HasWeightCol { + + /** + * The shape parameter to control the amount of robustness. Must be > 1.0. + * At larger values of M, the huber criterion becomes more similar to least squares regression; + * for small values of M, the criterion is more similar to L1 regression. + * Default is 1.35 to get as much robustness as possible while retaining + * 95% statistical efficiency for normally distributed data. + */ + @Since("2.1.0") + final val m = new DoubleParam(this, "m", "The shape parameter to control the amount of " + +"robustness. Must be > 1.0.", ParamValidators.gt(1.0)) + + /** @group getParam */ + @Since("2.1.0") + def getM: Double = $(m) +} + +/** + * Robust regression. + * + * The learning objective is to minimize the huber loss, with regularization. + * + * The robust regression optimizes the squared loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}} + * and the absolute loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}}, + * where \beta and \sigma are parameters to be optimized. + * + * This supports two types of regularization: None and L2. + * + * This estimator is different from the R implementation of Robust Regression + * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R implementation does a + * weighted least squares implementation with weights given to each sample on the basis + * of how much the residual is greater than a certain threshold. + */ +@Since("2.1.0") +class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: String) + extends Regressor[Vector, RobustRegression, RobustRegressionModel] + with RobustRegressionParams with Logging { + + @Since("2.1.0") + def this() = this(Identifiable.randomUID("robReg")) + + /** + * Sets the value of param [[m]]. + * Default is 1.35. + * @group setParam + */ + @Since("2.1.0") + def setM(value: Double): this.type = set(m, value) + setDefault(m -> 1.35) + + /** + * Sets the regularization parameter. + * Default is 0.0. + * @group setParam + */ + @Since("2.1.0") + def setRegParam(value: Double): this.type = set(regParam,
[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14326#discussion_r71992494 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala --- @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.regression + +import scala.collection.mutable + +import breeze.linalg.{DenseVector => BDV} +import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, LBFGSB => BreezeLBFGSB} + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Since +import org.apache.spark.internal.Logging +import org.apache.spark.ml.PredictorParams +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{Vector, Vectors} +import org.apache.spark.ml.linalg.BLAS._ +import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.storage.StorageLevel + +/** + * Params for robust regression. + */ +private[regression] trait RobustRegressionParams extends PredictorParams with HasRegParam + with HasMaxIter with HasTol with HasFitIntercept with HasStandardization with HasWeightCol { + + /** + * The shape parameter to control the amount of robustness. Must be > 1.0. + * At larger values of M, the huber criterion becomes more similar to least squares regression; + * for small values of M, the criterion is more similar to L1 regression. + * Default is 1.35 to get as much robustness as possible while retaining + * 95% statistical efficiency for normally distributed data. + */ + @Since("2.1.0") + final val m = new DoubleParam(this, "m", "The shape parameter to control the amount of " + +"robustness. Must be > 1.0.", ParamValidators.gt(1.0)) + + /** @group getParam */ + @Since("2.1.0") + def getM: Double = $(m) +} + +/** + * Robust regression. + * + * The learning objective is to minimize the huber loss, with regularization. + * + * The robust regression optimizes the squared loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}} + * and the absolute loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}}, + * where \beta and \sigma are parameters to be optimized. + * + * This supports two types of regularization: None and L2. + * + * This estimator is different from the R implementation of Robust Regression + * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R implementation does a + * weighted least squares implementation with weights given to each sample on the basis + * of how much the residual is greater than a certain threshold. + */ +@Since("2.1.0") +class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: String) + extends Regressor[Vector, RobustRegression, RobustRegressionModel] + with RobustRegressionParams with Logging { + + @Since("2.1.0") + def this() = this(Identifiable.randomUID("robReg")) + + /** + * Sets the value of param [[m]]. + * Default is 1.35. + * @group setParam + */ + @Since("2.1.0") + def setM(value: Double): this.type = set(m, value) + setDefault(m -> 1.35) + + /** + * Sets the regularization parameter. + * Default is 0.0. + * @group setParam + */ + @Since("2.1.0") + def setRegParam(value: Double): this.type = set(regParam,
[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14326#discussion_r71992474 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala --- @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.regression + +import scala.collection.mutable + +import breeze.linalg.{DenseVector => BDV} +import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, LBFGSB => BreezeLBFGSB} + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Since +import org.apache.spark.internal.Logging +import org.apache.spark.ml.PredictorParams +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{Vector, Vectors} +import org.apache.spark.ml.linalg.BLAS._ +import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.storage.StorageLevel + +/** + * Params for robust regression. + */ +private[regression] trait RobustRegressionParams extends PredictorParams with HasRegParam + with HasMaxIter with HasTol with HasFitIntercept with HasStandardization with HasWeightCol { + + /** + * The shape parameter to control the amount of robustness. Must be > 1.0. + * At larger values of M, the huber criterion becomes more similar to least squares regression; + * for small values of M, the criterion is more similar to L1 regression. + * Default is 1.35 to get as much robustness as possible while retaining + * 95% statistical efficiency for normally distributed data. + */ + @Since("2.1.0") + final val m = new DoubleParam(this, "m", "The shape parameter to control the amount of " + +"robustness. Must be > 1.0.", ParamValidators.gt(1.0)) + + /** @group getParam */ + @Since("2.1.0") + def getM: Double = $(m) +} + +/** + * Robust regression. + * + * The learning objective is to minimize the huber loss, with regularization. + * + * The robust regression optimizes the squared loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}} + * and the absolute loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}}, + * where \beta and \sigma are parameters to be optimized. + * + * This supports two types of regularization: None and L2. + * + * This estimator is different from the R implementation of Robust Regression + * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R implementation does a + * weighted least squares implementation with weights given to each sample on the basis + * of how much the residual is greater than a certain threshold. + */ +@Since("2.1.0") +class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: String) + extends Regressor[Vector, RobustRegression, RobustRegressionModel] + with RobustRegressionParams with Logging { + + @Since("2.1.0") + def this() = this(Identifiable.randomUID("robReg")) + + /** + * Sets the value of param [[m]]. + * Default is 1.35. + * @group setParam + */ + @Since("2.1.0") + def setM(value: Double): this.type = set(m, value) + setDefault(m -> 1.35) + + /** + * Sets the regularization parameter. + * Default is 0.0. + * @group setParam + */ + @Since("2.1.0") + def setRegParam(value: Double): this.type = set(regParam,
[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14326#discussion_r71992412 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala --- @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.regression + +import scala.collection.mutable + +import breeze.linalg.{DenseVector => BDV} +import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, LBFGSB => BreezeLBFGSB} + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Since +import org.apache.spark.internal.Logging +import org.apache.spark.ml.PredictorParams +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{Vector, Vectors} +import org.apache.spark.ml.linalg.BLAS._ +import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.storage.StorageLevel + +/** + * Params for robust regression. + */ +private[regression] trait RobustRegressionParams extends PredictorParams with HasRegParam + with HasMaxIter with HasTol with HasFitIntercept with HasStandardization with HasWeightCol { + + /** + * The shape parameter to control the amount of robustness. Must be > 1.0. + * At larger values of M, the huber criterion becomes more similar to least squares regression; + * for small values of M, the criterion is more similar to L1 regression. + * Default is 1.35 to get as much robustness as possible while retaining + * 95% statistical efficiency for normally distributed data. + */ + @Since("2.1.0") + final val m = new DoubleParam(this, "m", "The shape parameter to control the amount of " + +"robustness. Must be > 1.0.", ParamValidators.gt(1.0)) + + /** @group getParam */ + @Since("2.1.0") + def getM: Double = $(m) +} + +/** + * Robust regression. + * + * The learning objective is to minimize the huber loss, with regularization. + * + * The robust regression optimizes the squared loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}} + * and the absolute loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}}, + * where \beta and \sigma are parameters to be optimized. + * + * This supports two types of regularization: None and L2. + * + * This estimator is different from the R implementation of Robust Regression + * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R implementation does a + * weighted least squares implementation with weights given to each sample on the basis + * of how much the residual is greater than a certain threshold. + */ +@Since("2.1.0") +class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: String) + extends Regressor[Vector, RobustRegression, RobustRegressionModel] + with RobustRegressionParams with Logging { + + @Since("2.1.0") + def this() = this(Identifiable.randomUID("robReg")) + + /** + * Sets the value of param [[m]]. + * Default is 1.35. + * @group setParam + */ + @Since("2.1.0") + def setM(value: Double): this.type = set(m, value) + setDefault(m -> 1.35) + + /** + * Sets the regularization parameter. + * Default is 0.0. + * @group setParam + */ + @Since("2.1.0") + def setRegParam(value: Double): this.type = set(regParam,
[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14326#discussion_r71992373 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala --- @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.regression + +import scala.collection.mutable + +import breeze.linalg.{DenseVector => BDV} +import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, LBFGSB => BreezeLBFGSB} + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Since +import org.apache.spark.internal.Logging +import org.apache.spark.ml.PredictorParams +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{Vector, Vectors} +import org.apache.spark.ml.linalg.BLAS._ +import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.storage.StorageLevel + +/** + * Params for robust regression. + */ +private[regression] trait RobustRegressionParams extends PredictorParams with HasRegParam + with HasMaxIter with HasTol with HasFitIntercept with HasStandardization with HasWeightCol { + + /** + * The shape parameter to control the amount of robustness. Must be > 1.0. + * At larger values of M, the huber criterion becomes more similar to least squares regression; + * for small values of M, the criterion is more similar to L1 regression. + * Default is 1.35 to get as much robustness as possible while retaining + * 95% statistical efficiency for normally distributed data. + */ + @Since("2.1.0") + final val m = new DoubleParam(this, "m", "The shape parameter to control the amount of " + +"robustness. Must be > 1.0.", ParamValidators.gt(1.0)) + + /** @group getParam */ + @Since("2.1.0") + def getM: Double = $(m) +} + +/** + * Robust regression. + * + * The learning objective is to minimize the huber loss, with regularization. + * + * The robust regression optimizes the squared loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}} + * and the absolute loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}}, + * where \beta and \sigma are parameters to be optimized. + * + * This supports two types of regularization: None and L2. + * + * This estimator is different from the R implementation of Robust Regression + * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R implementation does a + * weighted least squares implementation with weights given to each sample on the basis + * of how much the residual is greater than a certain threshold. + */ +@Since("2.1.0") +class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: String) + extends Regressor[Vector, RobustRegression, RobustRegressionModel] + with RobustRegressionParams with Logging { + + @Since("2.1.0") --- End diff -- I don't think you need `@Since` at every symbol in the class (that was `@Since` itself with the same annotation). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14326#discussion_r71992352 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala --- @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.regression + +import scala.collection.mutable + +import breeze.linalg.{DenseVector => BDV} +import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, LBFGSB => BreezeLBFGSB} + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Since +import org.apache.spark.internal.Logging +import org.apache.spark.ml.PredictorParams +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{Vector, Vectors} +import org.apache.spark.ml.linalg.BLAS._ +import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.storage.StorageLevel + +/** + * Params for robust regression. + */ +private[regression] trait RobustRegressionParams extends PredictorParams with HasRegParam + with HasMaxIter with HasTol with HasFitIntercept with HasStandardization with HasWeightCol { + + /** + * The shape parameter to control the amount of robustness. Must be > 1.0. + * At larger values of M, the huber criterion becomes more similar to least squares regression; + * for small values of M, the criterion is more similar to L1 regression. + * Default is 1.35 to get as much robustness as possible while retaining + * 95% statistical efficiency for normally distributed data. + */ + @Since("2.1.0") + final val m = new DoubleParam(this, "m", "The shape parameter to control the amount of " + +"robustness. Must be > 1.0.", ParamValidators.gt(1.0)) + + /** @group getParam */ + @Since("2.1.0") + def getM: Double = $(m) +} + +/** + * Robust regression. + * + * The learning objective is to minimize the huber loss, with regularization. + * + * The robust regression optimizes the squared loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}} + * and the absolute loss for the samples where + * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}}, + * where \beta and \sigma are parameters to be optimized. + * + * This supports two types of regularization: None and L2. + * + * This estimator is different from the R implementation of Robust Regression + * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R implementation does a + * weighted least squares implementation with weights given to each sample on the basis + * of how much the residual is greater than a certain threshold. + */ +@Since("2.1.0") +class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: String) --- End diff -- Are all `@Since` required? I'd think the one on line 82 would be enough. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14327: [SPARK-16686][SQL] Project shouldn't be pushed do...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14327#discussion_r71992303 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -422,6 +422,32 @@ class DatasetSuite extends QueryTest with SharedSQLContext { 3, 17, 27, 58, 62) } + test("SPARK-16686: Dataset.sample with seed results shouldn't depend on downstream usage") { +val udfOne = spark.udf.register("udfOne", (n: Int) => { + require(n != 1, "udfOne shouldn't see swid=1!") + 1 +}) + +val d = Seq( + (0, "string0"), + (1, "string1"), + (2, "string2"), + (3, "string3"), + (4, "string4"), + (5, "string5"), + (6, "string6"), + (7, "string7"), + (8, "string8"), + (9, "string9") +) +val df = spark.createDataFrame(d).toDF("swid", "stringData") +val sampleDF = df.sample(false, 0.7, 50) +// After sampling, sampleDF doesn't contain swid=1. +assert(!sampleDF.select("swid").collect.contains(1)) +// udfOne should not encounter swid=1. +sampleDF.select(udfOne($"swid")).collect --- End diff -- I assume you're calling `collect` to trigger `assert`, aren't you? If so, why don't you return `true`/`false` to denote it and do `assert` here instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14327: [SPARK-16686][SQL] Project shouldn't be pushed do...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14327#discussion_r71992270 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -422,6 +422,32 @@ class DatasetSuite extends QueryTest with SharedSQLContext { 3, 17, 27, 58, 62) } + test("SPARK-16686: Dataset.sample with seed results shouldn't depend on downstream usage") { +val udfOne = spark.udf.register("udfOne", (n: Int) => { --- End diff -- Is there a reason why you `spark.udf.register` not `udf` directly? ``` val udfOne = udf { n: Int => ... } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14327: [SPARK-16686][SQL] Project shouldn't be pushed do...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14327#discussion_r71992254 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -422,6 +422,32 @@ class DatasetSuite extends QueryTest with SharedSQLContext { 3, 17, 27, 58, 62) } + test("SPARK-16686: Dataset.sample with seed results shouldn't depend on downstream usage") { +val udfOne = spark.udf.register("udfOne", (n: Int) => { + require(n != 1, "udfOne shouldn't see swid=1!") + 1 +}) + +val d = Seq( + (0, "string0"), + (1, "string1"), + (2, "string2"), + (3, "string3"), + (4, "string4"), + (5, "string5"), + (6, "string6"), + (7, "string7"), + (8, "string8"), + (9, "string9") +) +val df = spark.createDataFrame(d).toDF("swid", "stringData") --- End diff -- `d.toDF(...)` should work too, shouldn't it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14327: [SPARK-16686][SQL] Project shouldn't be pushed do...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14327#discussion_r71992242 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -150,13 +150,20 @@ class SimpleTestOptimizer extends Optimizer( /** * Pushes projects down beneath Sample to enable column pruning with sampling. + * This rule is only doable when the projects don't add new attributes. */ object PushProjectThroughSample extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Push down projection into sample -case Project(projectList, Sample(lb, up, replace, seed, child)) => +case p @ Project(projectList, Sample(lb, up, replace, seed, child)) +if !hasNewOutput(projectList, p.child.output) => Sample(lb, up, replace, seed, Project(projectList, child))() } + private def hasNewOutput( + projectList: Seq[NamedExpression], + childOutput: Seq[Attribute]): Boolean = { +projectList.exists(p => !childOutput.exists(_.semanticEquals(p))) --- End diff -- It's hard to understand what the code does -- two `exists` and negation. Can you "untangle" it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14313: [SPARK-16674][SQL] Avoid per-record type dispatch...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14313#discussion_r71977368 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala --- @@ -407,84 +495,8 @@ private[sql] class JDBCRDD( var i = 0 while (i < conversions.length) { --- End diff -- Why `while` not `foreach` or similar? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14313: [SPARK-16674][SQL] Avoid per-record type dispatch...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14313#discussion_r71977344 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala --- @@ -322,46 +322,134 @@ private[sql] class JDBCRDD( } } - // Each JDBC-to-Catalyst conversion corresponds to a tag defined here so that - // we don't have to potentially poke around in the Metadata once for every - // row. - // Is there a better way to do this? I'd rather be using a type that - // contains only the tags I define. - abstract class JDBCConversion - case object BooleanConversion extends JDBCConversion - case object DateConversion extends JDBCConversion - case class DecimalConversion(precision: Int, scale: Int) extends JDBCConversion - case object DoubleConversion extends JDBCConversion - case object FloatConversion extends JDBCConversion - case object IntegerConversion extends JDBCConversion - case object LongConversion extends JDBCConversion - case object BinaryLongConversion extends JDBCConversion - case object StringConversion extends JDBCConversion - case object TimestampConversion extends JDBCConversion - case object BinaryConversion extends JDBCConversion - case class ArrayConversion(elementConversion: JDBCConversion) extends JDBCConversion + // A `JDBCConversion` is responsible for converting a value from `ResultSet` + // to a value in a field for `InternalRow`. + private type JDBCConversion = (ResultSet, Int) => Any + + // This `ArrayElementConversion` is responsible for converting elements in + // an array from `ResultSet`. + private type ArrayElementConversion = (Object) => Any /** - * Maps a StructType to a type tag list. + * Maps a StructType to conversions for each type. */ def getConversions(schema: StructType): Array[JDBCConversion] = schema.fields.map(sf => getConversions(sf.dataType, sf.metadata)) private def getConversions(dt: DataType, metadata: Metadata): JDBCConversion = dt match { -case BooleanType => BooleanConversion -case DateType => DateConversion -case DecimalType.Fixed(p, s) => DecimalConversion(p, s) -case DoubleType => DoubleConversion -case FloatType => FloatConversion -case IntegerType => IntegerConversion -case LongType => if (metadata.contains("binarylong")) BinaryLongConversion else LongConversion -case StringType => StringConversion -case TimestampType => TimestampConversion -case BinaryType => BinaryConversion -case ArrayType(et, _) => ArrayConversion(getConversions(et, metadata)) +case BooleanType => + (rs: ResultSet, pos: Int) => rs.getBoolean(pos) + +case DateType => + (rs: ResultSet, pos: Int) => +// DateTimeUtils.fromJavaDate does not handle null value, so we need to check it. +val dateVal = rs.getDate(pos) +if (dateVal != null) { + DateTimeUtils.fromJavaDate(dateVal) +} else { + null +} + +case DecimalType.Fixed(p, s) => + (rs: ResultSet, pos: Int) => +val decimalVal = rs.getBigDecimal(pos) +if (decimalVal == null) { + null +} else { + Decimal(decimalVal, p, s) +} + +case DoubleType => + (rs: ResultSet, pos: Int) => rs.getDouble(pos) + +case FloatType => + (rs: ResultSet, pos: Int) => rs.getFloat(pos) + +case IntegerType => + (rs: ResultSet, pos: Int) => rs.getInt(pos) + +case LongType if metadata.contains("binarylong") => + (rs: ResultSet, pos: Int) => +val bytes = rs.getBytes(pos) +var ans = 0L +var j = 0 +while (j < bytes.size) { + ans = 256 * ans + (255 & bytes(j)) + j = j + 1 +} +ans + +case LongType => + (rs: ResultSet, pos: Int) => rs.getLong(pos) + +case StringType => + (rs: ResultSet, pos: Int) => +// TODO(davies): use getBytes for better performance, if the encoding is UTF-8 +UTF8String.fromString(rs.getString(pos)) + +case TimestampType => + (rs: ResultSet, pos: Int) => +val t = rs.getTimestamp(pos) +if (t != null) { + DateTimeUtils.fromJavaTimestamp(t) +} else { + null +} + +case BinaryType => + (rs: ResultSet, pos: Int) => rs.getBytes(pos) + +case ArrayType(
[GitHub] spark pull request #14313: [SPARK-16674][SQL] Avoid per-record type dispatch...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14313#discussion_r71977337 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala --- @@ -322,46 +322,134 @@ private[sql] class JDBCRDD( } } - // Each JDBC-to-Catalyst conversion corresponds to a tag defined here so that - // we don't have to potentially poke around in the Metadata once for every - // row. - // Is there a better way to do this? I'd rather be using a type that - // contains only the tags I define. - abstract class JDBCConversion - case object BooleanConversion extends JDBCConversion - case object DateConversion extends JDBCConversion - case class DecimalConversion(precision: Int, scale: Int) extends JDBCConversion - case object DoubleConversion extends JDBCConversion - case object FloatConversion extends JDBCConversion - case object IntegerConversion extends JDBCConversion - case object LongConversion extends JDBCConversion - case object BinaryLongConversion extends JDBCConversion - case object StringConversion extends JDBCConversion - case object TimestampConversion extends JDBCConversion - case object BinaryConversion extends JDBCConversion - case class ArrayConversion(elementConversion: JDBCConversion) extends JDBCConversion + // A `JDBCConversion` is responsible for converting a value from `ResultSet` + // to a value in a field for `InternalRow`. + private type JDBCConversion = (ResultSet, Int) => Any + + // This `ArrayElementConversion` is responsible for converting elements in + // an array from `ResultSet`. + private type ArrayElementConversion = (Object) => Any /** - * Maps a StructType to a type tag list. + * Maps a StructType to conversions for each type. */ def getConversions(schema: StructType): Array[JDBCConversion] = schema.fields.map(sf => getConversions(sf.dataType, sf.metadata)) private def getConversions(dt: DataType, metadata: Metadata): JDBCConversion = dt match { -case BooleanType => BooleanConversion -case DateType => DateConversion -case DecimalType.Fixed(p, s) => DecimalConversion(p, s) -case DoubleType => DoubleConversion -case FloatType => FloatConversion -case IntegerType => IntegerConversion -case LongType => if (metadata.contains("binarylong")) BinaryLongConversion else LongConversion -case StringType => StringConversion -case TimestampType => TimestampConversion -case BinaryType => BinaryConversion -case ArrayType(et, _) => ArrayConversion(getConversions(et, metadata)) +case BooleanType => + (rs: ResultSet, pos: Int) => rs.getBoolean(pos) + +case DateType => + (rs: ResultSet, pos: Int) => +// DateTimeUtils.fromJavaDate does not handle null value, so we need to check it. +val dateVal = rs.getDate(pos) +if (dateVal != null) { + DateTimeUtils.fromJavaDate(dateVal) +} else { + null +} + +case DecimalType.Fixed(p, s) => + (rs: ResultSet, pos: Int) => +val decimalVal = rs.getBigDecimal(pos) +if (decimalVal == null) { + null +} else { + Decimal(decimalVal, p, s) +} + +case DoubleType => + (rs: ResultSet, pos: Int) => rs.getDouble(pos) + +case FloatType => + (rs: ResultSet, pos: Int) => rs.getFloat(pos) + +case IntegerType => + (rs: ResultSet, pos: Int) => rs.getInt(pos) + +case LongType if metadata.contains("binarylong") => + (rs: ResultSet, pos: Int) => +val bytes = rs.getBytes(pos) +var ans = 0L +var j = 0 +while (j < bytes.size) { + ans = 256 * ans + (255 & bytes(j)) + j = j + 1 +} +ans + +case LongType => + (rs: ResultSet, pos: Int) => rs.getLong(pos) + +case StringType => + (rs: ResultSet, pos: Int) => +// TODO(davies): use getBytes for better performance, if the encoding is UTF-8 +UTF8String.fromString(rs.getString(pos)) + +case TimestampType => + (rs: ResultSet, pos: Int) => +val t = rs.getTimestamp(pos) +if (t != null) { --- End diff -- same as above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so
[GitHub] spark pull request #14313: [SPARK-16674][SQL] Avoid per-record type dispatch...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14313#discussion_r71977329 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala --- @@ -322,46 +322,134 @@ private[sql] class JDBCRDD( } } - // Each JDBC-to-Catalyst conversion corresponds to a tag defined here so that - // we don't have to potentially poke around in the Metadata once for every - // row. - // Is there a better way to do this? I'd rather be using a type that - // contains only the tags I define. - abstract class JDBCConversion - case object BooleanConversion extends JDBCConversion - case object DateConversion extends JDBCConversion - case class DecimalConversion(precision: Int, scale: Int) extends JDBCConversion - case object DoubleConversion extends JDBCConversion - case object FloatConversion extends JDBCConversion - case object IntegerConversion extends JDBCConversion - case object LongConversion extends JDBCConversion - case object BinaryLongConversion extends JDBCConversion - case object StringConversion extends JDBCConversion - case object TimestampConversion extends JDBCConversion - case object BinaryConversion extends JDBCConversion - case class ArrayConversion(elementConversion: JDBCConversion) extends JDBCConversion + // A `JDBCConversion` is responsible for converting a value from `ResultSet` + // to a value in a field for `InternalRow`. + private type JDBCConversion = (ResultSet, Int) => Any + + // This `ArrayElementConversion` is responsible for converting elements in + // an array from `ResultSet`. + private type ArrayElementConversion = (Object) => Any /** - * Maps a StructType to a type tag list. + * Maps a StructType to conversions for each type. */ def getConversions(schema: StructType): Array[JDBCConversion] = schema.fields.map(sf => getConversions(sf.dataType, sf.metadata)) private def getConversions(dt: DataType, metadata: Metadata): JDBCConversion = dt match { -case BooleanType => BooleanConversion -case DateType => DateConversion -case DecimalType.Fixed(p, s) => DecimalConversion(p, s) -case DoubleType => DoubleConversion -case FloatType => FloatConversion -case IntegerType => IntegerConversion -case LongType => if (metadata.contains("binarylong")) BinaryLongConversion else LongConversion -case StringType => StringConversion -case TimestampType => TimestampConversion -case BinaryType => BinaryConversion -case ArrayType(et, _) => ArrayConversion(getConversions(et, metadata)) +case BooleanType => + (rs: ResultSet, pos: Int) => rs.getBoolean(pos) + +case DateType => + (rs: ResultSet, pos: Int) => +// DateTimeUtils.fromJavaDate does not handle null value, so we need to check it. +val dateVal = rs.getDate(pos) +if (dateVal != null) { + DateTimeUtils.fromJavaDate(dateVal) +} else { + null +} + +case DecimalType.Fixed(p, s) => + (rs: ResultSet, pos: Int) => +val decimalVal = rs.getBigDecimal(pos) +if (decimalVal == null) { --- End diff -- Same as above (plus you're checking equality with `null` opposite to the above -- consistency violated) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14313: [SPARK-16674][SQL] Avoid per-record type dispatch...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14313#discussion_r71977310 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala --- @@ -322,46 +322,134 @@ private[sql] class JDBCRDD( } } - // Each JDBC-to-Catalyst conversion corresponds to a tag defined here so that - // we don't have to potentially poke around in the Metadata once for every - // row. - // Is there a better way to do this? I'd rather be using a type that - // contains only the tags I define. - abstract class JDBCConversion - case object BooleanConversion extends JDBCConversion - case object DateConversion extends JDBCConversion - case class DecimalConversion(precision: Int, scale: Int) extends JDBCConversion - case object DoubleConversion extends JDBCConversion - case object FloatConversion extends JDBCConversion - case object IntegerConversion extends JDBCConversion - case object LongConversion extends JDBCConversion - case object BinaryLongConversion extends JDBCConversion - case object StringConversion extends JDBCConversion - case object TimestampConversion extends JDBCConversion - case object BinaryConversion extends JDBCConversion - case class ArrayConversion(elementConversion: JDBCConversion) extends JDBCConversion + // A `JDBCConversion` is responsible for converting a value from `ResultSet` + // to a value in a field for `InternalRow`. + private type JDBCConversion = (ResultSet, Int) => Any + + // This `ArrayElementConversion` is responsible for converting elements in + // an array from `ResultSet`. + private type ArrayElementConversion = (Object) => Any /** - * Maps a StructType to a type tag list. + * Maps a StructType to conversions for each type. */ def getConversions(schema: StructType): Array[JDBCConversion] = schema.fields.map(sf => getConversions(sf.dataType, sf.metadata)) private def getConversions(dt: DataType, metadata: Metadata): JDBCConversion = dt match { -case BooleanType => BooleanConversion -case DateType => DateConversion -case DecimalType.Fixed(p, s) => DecimalConversion(p, s) -case DoubleType => DoubleConversion -case FloatType => FloatConversion -case IntegerType => IntegerConversion -case LongType => if (metadata.contains("binarylong")) BinaryLongConversion else LongConversion -case StringType => StringConversion -case TimestampType => TimestampConversion -case BinaryType => BinaryConversion -case ArrayType(et, _) => ArrayConversion(getConversions(et, metadata)) +case BooleanType => + (rs: ResultSet, pos: Int) => rs.getBoolean(pos) + +case DateType => + (rs: ResultSet, pos: Int) => +// DateTimeUtils.fromJavaDate does not handle null value, so we need to check it. +val dateVal = rs.getDate(pos) +if (dateVal != null) { --- End diff -- `Option(dateVal).map(...).orNull`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14295: [SPARK-16648][SQL] Overrides TreeNode.withNewChil...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14295#discussion_r71908714 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala --- @@ -42,6 +42,17 @@ case class Last(child: Expression, ignoreNullsExpr: Expression) extends Declarat override def children: Seq[Expression] = child :: Nil + // SPARK-16648: Default `TreeNode.withNewChildren` implementation doesn't work for `Last` when --- End diff -- Make it so? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14309: [SPARK-11977][SQL] Support accessing a column con...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14309#discussion_r7196 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -641,6 +641,10 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row(key, value, key + 1) }.toSeq) assert(df.schema.map(_.name) === Seq("key", "valueRenamed", "newCol")) + +// Renaming to a column that contains "." character +val df2 = testData.toDF().withColumnRenamed("value", "value.Renamed") --- End diff -- No need for `()` in `toDF()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14309: [SPARK-11977][SQL] Support accessing a column con...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14309#discussion_r7137 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -641,6 +641,10 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row(key, value, key + 1) }.toSeq) assert(df.schema.map(_.name) === Seq("key", "valueRenamed", "newCol")) + +// Renaming to a column that contains "." character +val df2 = testData.toDF().withColumnRenamed("value", "value.Renamed") +assert(df2.schema.map(_.name) === Seq("key", "value.Renamed")) --- End diff -- `df2.schema.fieldNames`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14309: [SPARK-11977][SQL] Support accessing a column con...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14309#discussion_r71888979 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -641,6 +641,10 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row(key, value, key + 1) }.toSeq) assert(df.schema.map(_.name) === Seq("key", "valueRenamed", "newCol")) --- End diff -- I'd fix that line, too with `fieldNames` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14307: [SPARK-16672][SQL] SQLBuilder should not raise exception...
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/14307 I didn't even know such a SQL query is possible in SQL :) Lots to learn still. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14305: Spark-16669:Adding partition prunning to Metastore stati...
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/14305 I think the PR is completely broken and should be recreated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14302: [SPARK-16663][SQL] desc table should be consisten...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14302#discussion_r71886372 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala --- @@ -41,13 +41,13 @@ class HiveOperatorQueryableSuite extends QueryTest with TestHiveSingleton { checkAnswer( sql("select * from mydesc"), Seq( -Row("key", "int", null), -Row("value", "string", null))) +Row("key", "int", ""), --- End diff -- Wish there were a "better" variant of `Row` as we traded `null`s to `""` :( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14302: [SPARK-16663][SQL] desc table should be consisten...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14302#discussion_r71885483 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -436,11 +436,13 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF private def describePartitionInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { if (DDLUtils.isDatasourceTable(table)) { - val partCols = DDLUtils.getPartitionColumnsFromTableProperties(table) - if (partCols.nonEmpty) { + val schema = DDLUtils.getSchemaFromTableProperties(table) + val partColNames = DDLUtils.getPartitionColumnsFromTableProperties(table) + if (partColNames.nonEmpty && schema.isDefined) { append(buffer, "# Partition Information", "", "") -append(buffer, s"# ${output.head.name}", "", "") -partCols.foreach(col => append(buffer, col, "", "")) +append(buffer, s"# ${output.head.name}", output(1).name, output(2).name) +val partCols = partColNames.map(n => schema.get.find(_.name == n).get) --- End diff -- What do you think about this? ``` val s = schema.get s.fieldNames.intersect(partColNames).map(s.apply) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14295: [SPARK-16648][SQL] Overrides TreeNode.withNewChil...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14295#discussion_r71871902 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala --- @@ -42,6 +42,17 @@ case class Last(child: Expression, ignoreNullsExpr: Expression) extends Declarat override def children: Seq[Expression] = child :: Nil + // SPARK-16648: Default `TreeNode.withNewChildren` implementation doesn't work for `Last` when + // both constructor arguments are the same, e.g.: + // + // LAST_VALUE(FALSE) // The 2nd argument defaults to FALSE + // LAST_VALUE(FALSE, FALSE) + // LAST_VALUE(TRUE, TRUE) + override def withNewChildren(newChildren: Seq[Expression]): Expression = { +val Seq(newChild) = newChildren --- End diff -- ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14295: [SPARK-16648][SQL] Overrides TreeNode.withNewChil...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14295#discussion_r71871696 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala --- @@ -42,6 +42,17 @@ case class Last(child: Expression, ignoreNullsExpr: Expression) extends Declarat override def children: Seq[Expression] = child :: Nil + // SPARK-16648: Default `TreeNode.withNewChildren` implementation doesn't work for `Last` when --- End diff -- Use `[[` to reference code symbols. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14292#discussion_r71871490 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala --- @@ -247,6 +248,46 @@ private[sql] trait SQLTestUtils } } } + + /** Run a test on a separate [[UninterruptibleThread]]. */ + protected def testWithUninterruptibleThread(name: String, quietly: Boolean = false) +(body: => Unit): Unit = { +val timeoutMillis = 1 +var ex: Throwable = null + +def runOnThread(): Unit = { + val thread = new UninterruptibleThread(s"Testing thread for test $name") { +override def run(): Unit = { + try { +body + } catch { +case NonFatal(e) => + ex = e + } +} + } + thread.setDaemon(true) + thread.start() + thread.join(timeoutMillis) + if (thread.isAlive) { +thread.interrupt() +// If this interrupt does not work, then this thread is most likely running something that +// is not interruptible. There is not much point to wait for the thread to termniate, and +// we rather let the JVM terminate the thread on exit. +fail( + s"Test '$name' running on o.a.s.util.UninterruptibleThread timed out after" + +s" $timeoutMillis ms") + } else if (ex != null) { +throw ex + } +} + +if (quietly) { --- End diff -- I'd appreciate your comment on the following alternative: ``` val f = if (quietly) testQuietly else test f(name) { runOnThread() } ``` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14292#discussion_r71871337 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala --- @@ -247,6 +248,46 @@ private[sql] trait SQLTestUtils } } } + + /** Run a test on a separate [[UninterruptibleThread]]. */ + protected def testWithUninterruptibleThread(name: String, quietly: Boolean = false) +(body: => Unit): Unit = { +val timeoutMillis = 1 +var ex: Throwable = null + +def runOnThread(): Unit = { + val thread = new UninterruptibleThread(s"Testing thread for test $name") { +override def run(): Unit = { + try { +body + } catch { +case NonFatal(e) => + ex = e --- End diff -- Will it work?! You're on another thread here and closing over `ex`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14292#discussion_r71871037 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -269,19 +273,11 @@ class StreamExecution( * batchId counter is incremented and a new log entry is written with the newest offsets. */ private def constructNextBatch(): Unit = { -// There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). -// If we interrupt some thread running Shell.runCommand, we may hit this issue. -// As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand" -// to set the file permission, we should not interrupt "microBatchThread" when running this -// method. See SPARK-14131. -// // Check to see what new data is available. val hasNewData = { awaitBatchLock.lock() try { -val newData = microBatchThread.runUninterruptibly { - uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) -} +val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) --- End diff -- Just a single line but takes a while to figure out what it does. I'd rewrite it to: ``` uniqueSources.map(s => (s, s.getOffset))... ``` and would do more transformation depending on the types (didn't check in IDE) Just an idea to untangle the knots :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14292#discussion_r71870635 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala --- @@ -91,18 +92,30 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) serializer.deserialize[T](ByteBuffer.wrap(bytes)) } + /** + * Store the metadata for the specified batchId and return `true` if successful. If the batchId's + * metadata has already been stored, this method will return `false`. + * + * Note that this method must be called on a [[org.apache.spark.util.UninterruptibleThread]] + * so that interrupts can be disabled while writing the batch file. This is because there is a + * potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread + * running "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our + * case, `writeBatch` creates a file using HDFS API and calls "Shell.runCommand" to set the + * file permissions, and can get deadlocked is the stream execution thread is stopped by + * interrupt. Hence, we make sure that this method is called on UninterruptibleThread which --- End diff -- `[[org.apache.spark.util.UninterruptibleThread]]` (as you do below) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14292#discussion_r71870509 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala --- @@ -91,18 +92,30 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) serializer.deserialize[T](ByteBuffer.wrap(bytes)) } + /** + * Store the metadata for the specified batchId and return `true` if successful. If the batchId's + * metadata has already been stored, this method will return `false`. + * + * Note that this method must be called on a [[org.apache.spark.util.UninterruptibleThread]] + * so that interrupts can be disabled while writing the batch file. This is because there is a + * potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread + * running "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our + * case, `writeBatch` creates a file using HDFS API and calls "Shell.runCommand" to set the + * file permissions, and can get deadlocked is the stream execution thread is stopped by + * interrupt. Hence, we make sure that this method is called on UninterruptibleThread which + * allows use disable interrupts. Also see SPARK-14131. --- End diff -- "allow use disable interrupts"? Is this ok? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14292#discussion_r71870459 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala --- @@ -91,18 +92,30 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) serializer.deserialize[T](ByteBuffer.wrap(bytes)) } + /** + * Store the metadata for the specified batchId and return `true` if successful. If the batchId's + * metadata has already been stored, this method will return `false`. + * + * Note that this method must be called on a [[org.apache.spark.util.UninterruptibleThread]] + * so that interrupts can be disabled while writing the batch file. This is because there is a + * potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread + * running "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our + * case, `writeBatch` creates a file using HDFS API and calls "Shell.runCommand" to set the + * file permissions, and can get deadlocked is the stream execution thread is stopped by --- End diff -- s/is/if --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14304: [SPARK-16668][TEST] Test parquet reader for row g...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14304#discussion_r71844762 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala --- @@ -78,4 +78,30 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex }} } } + + test("Read row group containing both dictionary and plain encoded pages") { +spark.conf.set("parquet.dictionary.page.size", "2048") +spark.conf.set("parquet.page.size", "4096") + +withTempPath { dir => + // In order to explicitly test for SPARK-14217, we set the parquet dictionary and page size + // such that the following data spans across 3 pages (within a single row group) where the + // first page is dictionary encoded and the remaining two are plain encoded. + val data = (0 until 512).flatMap(i => Seq.fill(3)(i.toString)) + data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath) + val file = + SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head.asInstanceOf[String] + + val reader = new VectorizedParquetRecordReader + reader.initialize(file, null /* set columns to null to project all columns */) + val column = reader.resultBatch().column(0) + assert(reader.nextBatch()) + + (0 until 512).foreach { i => +assert(column.getUTF8String(3 * i).toString == i.toString) --- End diff -- What about `toInt` as follows: ``` assert(column.getUTF8String(3 * i).toInt == i) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14304: [SPARK-16668][TEST] Test parquet reader for row g...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14304#discussion_r71844632 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala --- @@ -78,4 +78,30 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex }} } } + + test("Read row group containing both dictionary and plain encoded pages") { +spark.conf.set("parquet.dictionary.page.size", "2048") +spark.conf.set("parquet.page.size", "4096") + +withTempPath { dir => + // In order to explicitly test for SPARK-14217, we set the parquet dictionary and page size + // such that the following data spans across 3 pages (within a single row group) where the + // first page is dictionary encoded and the remaining two are plain encoded. + val data = (0 until 512).flatMap(i => Seq.fill(3)(i.toString)) + data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath) + val file = + SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head.asInstanceOf[String] + + val reader = new VectorizedParquetRecordReader + reader.initialize(file, null /* set columns to null to project all columns */) --- End diff -- I meant `initialize(file, columns = null)` or even: ``` val projectAllColumns = null initialize(file, projectAllColumns) ``` So you code what your intention is (without extra comments). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14315: [HOTFIX][BUILD][SPARK-16287][SQL] Fix annotation argumen...
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/14315 I'm on Java 8. I bet you are not. True? Also, I remembered how to fix it since this kind of error happened in the past few times. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14315: [HOTFIX][BUILD][SPARK-16287][SQL] Fix annotation argumen...
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/14315 /cc @cloud-fan @techaddict --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14315: [HOTFIX] Fix annotation argument needs to be a co...
GitHub user jaceklaskowski opened a pull request: https://github.com/apache/spark/pull/14315 [HOTFIX] Fix annotation argument needs to be a constant ## What changes were proposed in this pull request? Fix for compilation error: ``` /Users/jacek/dev/oss/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala:402: error: annotation argument needs to be a constant; found: "_FUNC_(text[, pairDelim, keyValueDelim]) - Creates a map after splitting the text ".+("into key/value pairs using delimiters. ").+("Default delimiters are \',\' for pairDelim and \':\' for keyValueDelim.") "into key/value pairs using delimiters. " + ^ ``` ## How was this patch tested? Local build You can merge this pull request into a Git repository by running: $ git pull https://github.com/jaceklaskowski/spark build-fix-complexTypeCreator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14315.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14315 commit ee34380a532ca8b5df5f938a0eba51c94a973dff Author: Jacek Laskowski Date: 2016-07-22T05:24:08Z [HOTFIX] Fix annotation argument needs to be a constant --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14284: [SPARK-16633] [SPARK-16642] Fixes three issues re...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14284#discussion_r71781038 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala --- @@ -357,14 +356,59 @@ class SQLWindowFunctionSuite extends QueryTest with SQLTestUtils with TestHiveSi } test("SPARK-7595: Window will cause resolve failed with self join") { -sql("SELECT * FROM src") // Force loading of src table. - checkAnswer(sql( """ |with -| v1 as (select key, count(value) over (partition by key) cnt_val from src), +| v0 as (select 0 as key, 1 as value), +| v1 as (select key, count(value) over (partition by key) cnt_val from v0), | v2 as (select v1.key, v1_lag.cnt_val from v1, v1 v1_lag where v1.key = v1_lag.key) -| select * from v2 order by key limit 1 - """.stripMargin), Row(0, 3)) +| select key, cnt_val from v2 order by key limit 1 + """.stripMargin), Row(0, 1)) + } + + test("lead/lag should return the default value if the offset row does not exist") { +checkAnswer(sql( + """ +|SELECT +| lag(123, 100, 321) OVER (ORDER BY id) as lag, +| lead(123, 100, 321) OVER (ORDER BY id) as lead +|FROM (SELECT 1 as id) tmp + """.stripMargin), + Row(321, 321)) + +checkAnswer(sql( + """ +|SELECT +| lag(123, 100, a) OVER (ORDER BY id) as lag, +| lead(123, 100, a) OVER (ORDER BY id) as lead +|FROM (SELECT 1 as id, 2 as a) tmp + """.stripMargin), + Row(2, 2)) + } + + test("lead/lag should be able to handle null input value correctly") { --- End diff -- I don't think "correctly" is needed here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14304: [SPARK-16668][TEST] Test parquet reader for row g...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14304#discussion_r71780859 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala --- @@ -78,4 +78,29 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex }} } } + + test("Read row group containing both dictionary and plain encoded pages") { +spark.conf.set("parquet.dictionary.page.size", "2048") +spark.conf.set("parquet.page.size", "4096") + +withTempPath { dir => + // In order to explicitly test for SPARK-14217, we set the parquet dictionary and page size + // such that the following data spans across 3 pages (within a single row group) where the + // first page is dictionary encoded and the remaining two are plain encoded. + val data = (0 until 512).flatMap(i => List(i.toString, i.toString, i.toString)) --- End diff -- What do you think about `Seq.fill(3)(i.toString)`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14285: [SPARK-16649][SQL] Push partition predicates down...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14285#discussion_r71776877 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala --- @@ -632,6 +632,23 @@ class SessionCatalog( } /** + * Returns partitions filtered by predicates for the given table, It just work for Hive. --- End diff -- Why do you copy the scaladoc? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14285: [SPARK-16649][SQL] Push partition predicates down...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14285#discussion_r71776725 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala --- @@ -632,6 +632,23 @@ class SessionCatalog( } /** + * Returns partitions filtered by predicates for the given table, It just work for Hive. --- End diff -- Same as above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14285: [SPARK-16649][SQL] Push partition predicates down...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14285#discussion_r71776630 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala --- @@ -167,6 +168,21 @@ abstract class ExternalCatalog { table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] + /** + * Returns partitions filtered by predicates for the given table, It just work for Hive. + * + * The filters Expressions may optionally be provided to filter the partitions returned. + * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), + * then the filters (a='1') will return the first two only. + * @param db database name --- End diff -- New line before `@param`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14285: [SPARK-16649][SQL] Push partition predicates down...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14285#discussion_r71776459 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala --- @@ -167,6 +168,21 @@ abstract class ExternalCatalog { table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] + /** + * Returns partitions filtered by predicates for the given table, It just work for Hive. --- End diff -- ". It just works" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14304: [SPARK-16668][TEST] Test parquet reader for row g...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14304#discussion_r71774436 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala --- @@ -78,4 +78,29 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex }} } } + + test("Read row group containing both dictionary and plain encoded pages") { +spark.conf.set("parquet.dictionary.page.size", "2048") +spark.conf.set("parquet.page.size", "4096") + +withTempPath { dir => + // In order to explicitly test for SPARK-14217, we set the parquet dictionary and page size + // such that the following data spans across 3 pages (within a single row group) where the + // first page is dictionary encoded and the remaining two are plain encoded. + val data = (0 until 512).flatMap(i => List(i.toString, i.toString, i.toString)) + data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath) + val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head + + val reader = new VectorizedParquetRecordReader + reader.initialize(file.asInstanceOf[String], null) + val batch = reader.resultBatch() + assert(reader.nextBatch()) + + (0 until 512).foreach { i => +assert(batch.column(0).getUTF8String(3 * i).toString == i.toString) --- End diff -- Two things here: 1. Create column in line 96 (`batch.column(0)`). 2. Since you convert `toString`, what do you think about `toInt` instead (since `i` is `Int` anyway). One conversion less :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14304: [SPARK-16668][TEST] Test parquet reader for row g...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14304#discussion_r71773933 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala --- @@ -78,4 +78,29 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex }} } } + + test("Read row group containing both dictionary and plain encoded pages") { +spark.conf.set("parquet.dictionary.page.size", "2048") +spark.conf.set("parquet.page.size", "4096") + +withTempPath { dir => + // In order to explicitly test for SPARK-14217, we set the parquet dictionary and page size + // such that the following data spans across 3 pages (within a single row group) where the + // first page is dictionary encoded and the remaining two are plain encoded. + val data = (0 until 512).flatMap(i => List(i.toString, i.toString, i.toString)) + data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath) + val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head + + val reader = new VectorizedParquetRecordReader + reader.initialize(file.asInstanceOf[String], null) --- End diff -- What do you think about moving this `asInstanceOf` to line 92 and using a name parameter for `null`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] [WIP] Store the Inferred Sche...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71083334 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala --- @@ -351,6 +353,44 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** + * Refresh the inferred schema stored in the external catalog for data source tables. + */ + private def refreshInferredSchema(tableIdent: TableIdentifier): Unit = { +val table = sessionCatalog.getTableMetadataOption(tableIdent) +table.foreach { tableDesc => + if (DDLUtils.isDatasourceTable(tableDesc) && DDLUtils.isSchemaInferred(tableDesc)) { +val partitionColumns = DDLUtils.getPartitionColumnsFromTableProperties(tableDesc) +val bucketSpec = DDLUtils.getBucketSpecFromTableProperties(tableDesc) +val dataSource = + DataSource( +sparkSession, +userSpecifiedSchema = None, +partitionColumns = partitionColumns, +bucketSpec = bucketSpec, +className = tableDesc.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER), +options = tableDesc.storage.serdeProperties) +.resolveRelation().asInstanceOf[HadoopFsRelation] + +val schemaProperties = new mutable.HashMap[String, String] +CreateDataSourceTableUtils.saveSchema( + sparkSession, dataSource.schema, dataSource.partitionSchema.fieldNames, schemaProperties) + +val tablePropertiesWithoutSchema = tableDesc.properties.filterKeys { k => + // Keep the properties that are not for schema or partition columns + k != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS && --- End diff -- It's hard to know what the code's doing inside `filterKeys` -- consider creating a predicate function with a proper name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14122: [SPARK-16470][ML][Optimizer] Check linear regress...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14122#discussion_r71083220 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala --- @@ -327,6 +327,11 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String throw new SparkException(msg) } + if (!state.actuallyConverged) { +logWarning("LinearRegression training fininshed but the result " + --- End diff -- @srowen There's a typo in `fininshed` :( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] [WIP] Store the Inferred Sche...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71083304 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -487,6 +487,10 @@ object DDLUtils { isDatasourceTable(table.properties) } + def isSchemaInferred(table: CatalogTable): Boolean = { +table.properties.get(DATASOURCE_SCHEMA_TYPE) == Option(SchemaType.INFERRED.name) --- End diff -- Consider `contains`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14210: [SPARK-16556] [SPARK-16559] [SQL] Fix Two Bugs in...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14210#discussion_r71072892 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -1264,6 +1265,29 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("create table using cluster by without schema specification") { --- End diff -- s/cluster/clustered? Even an uppercase variant CLUSTERED BY as in the other descriptions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14210: [SPARK-16556] [SPARK-16559] [SQL] Fix Two Bugs in...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14210#discussion_r71072862 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala --- @@ -199,7 +200,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with } } - test("create table using as select - with bucket") { + test("create table using as select - with non-zero bucket") { --- End diff -- s/bucket/buckets? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14210: [SPARK-16556] [SPARK-16559] [SQL] Fix Two Bugs in...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14210#discussion_r71072854 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala --- @@ -212,7 +213,23 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with ) val table = catalog.getTableMetadata(TableIdentifier("t")) assert(DDLUtils.getBucketSpecFromTableProperties(table) == -Some(BucketSpec(5, Seq("a"), Seq("b" +Option(BucketSpec(5, Seq("a"), Seq("b" +} + } + + test("create table using as select - with zero bucket") { --- End diff -- s/bucket/buckets? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14210: [SPARK-16556] [SPARK-16559] [SQL] Fix Two Bugs in...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14210#discussion_r71072842 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -1264,6 +1265,29 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("create table using cluster by without schema specification") { +import testImplicits._ +withTempPath { tempDir => + withTable("jsonTable") { +(("a", "b") :: Nil).toDF().toJSON.rdd.saveAsTextFile(tempDir.getCanonicalPath) --- End diff -- Why don't you use `toDF.write.json` instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14217: [SPARK-16562][SQL] Do not allow downcast in INT32...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14217#discussion_r71072773 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala --- @@ -169,6 +169,19 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } + test("SPARK-16562 Do not allow downcast in INT32 based types for normal Parquet reader") { +withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { + withTempPath { file => +(1 to 4).map(Tuple1(_)).toDF("a").write.parquet(file.getAbsolutePath) --- End diff -- Why do you `map(Tuple1(_))`? Why don't `(1 to 4).toDF("a")` instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13526: [SPARK-15780][SQL] Support mapValues on KeyValueG...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/13526#discussion_r69986596 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -312,6 +312,17 @@ class DatasetSuite extends QueryTest with SharedSQLContext { "a", "30", "b", "3", "c", "1") } + test("groupBy function, mapValues, flatMap") { +val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() --- End diff -- Just `.toDS`? (no brackets) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13526: [SPARK-15780][SQL] Support mapValues on KeyValueG...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/13526#discussion_r69986532 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -65,6 +65,46 @@ class KeyValueGroupedDataset[K, V] private[sql]( groupingAttributes) /** + * Returns a new [[KeyValueGroupedDataset]] where the given function has been applied to the + * data. The grouping key is unchanged by this. + * + * {{{ + * // Create values grouped by key from a Dataset[(K, V)] + * ds.groupByKey(_._1).mapValues(_._2) // Scala + * }}} + * @since 2.0.0 + */ + def mapValues[W: Encoder](func: V => W): KeyValueGroupedDataset[K, W] = { +val withNewData = AppendColumns(func, dataAttributes, logicalPlan) +val projected = Project(withNewData.newColumns ++ groupingAttributes, withNewData) +val executed = sparkSession.sessionState.executePlan(projected) + +new KeyValueGroupedDataset( + encoderFor[K], + encoderFor[W], + executed, + withNewData.newColumns, + groupingAttributes) + } + + /** + * Returns a new [[KeyValueGroupedDataset]] where the given function has been applied to the + * data. The grouping key is unchanged by this. + * + * {{{ + * // Create Integer values grouped by String key from a Dataset> + * Dataset> ds = ...; + * KeyValueGroupedDataset grouped = + * ds.groupByKey(t -> t._1, Encoders.STRING()).mapValues(t -> t._2, Encoders.INT()); // Java 8 + * }}} + * @since 2.0.0 + */ + def mapValues[W](func: MapFunction[V, W], encoder: Encoder[W]): KeyValueGroupedDataset[K, W] = { +implicit val uEnc = encoder +mapValues{ (v: V) => func.call(v) } --- End diff -- A space before `{`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13526: [SPARK-15780][SQL] Support mapValues on KeyValueG...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/13526#discussion_r69986479 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -65,6 +65,46 @@ class KeyValueGroupedDataset[K, V] private[sql]( groupingAttributes) /** + * Returns a new [[KeyValueGroupedDataset]] where the given function has been applied to the + * data. The grouping key is unchanged by this. + * + * {{{ + * // Create values grouped by key from a Dataset[(K, V)] + * ds.groupByKey(_._1).mapValues(_._2) // Scala + * }}} + * @since 2.0.0 + */ + def mapValues[W: Encoder](func: V => W): KeyValueGroupedDataset[K, W] = { +val withNewData = AppendColumns(func, dataAttributes, logicalPlan) +val projected = Project(withNewData.newColumns ++ groupingAttributes, withNewData) +val executed = sparkSession.sessionState.executePlan(projected) + +new KeyValueGroupedDataset( + encoderFor[K], + encoderFor[W], + executed, + withNewData.newColumns, + groupingAttributes) + } + + /** + * Returns a new [[KeyValueGroupedDataset]] where the given function has been applied to the + * data. The grouping key is unchanged by this. + * + * {{{ + * // Create Integer values grouped by String key from a Dataset> + * Dataset> ds = ...; + * KeyValueGroupedDataset grouped = + * ds.groupByKey(t -> t._1, Encoders.STRING()).mapValues(t -> t._2, Encoders.INT()); // Java 8 + * }}} + * @since 2.0.0 --- End diff -- A new line before `@since`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13526: [SPARK-15780][SQL] Support mapValues on KeyValueG...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/13526#discussion_r69986420 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -65,6 +65,46 @@ class KeyValueGroupedDataset[K, V] private[sql]( groupingAttributes) /** + * Returns a new [[KeyValueGroupedDataset]] where the given function has been applied to the + * data. The grouping key is unchanged by this. + * + * {{{ + * // Create values grouped by key from a Dataset[(K, V)] + * ds.groupByKey(_._1).mapValues(_._2) // Scala + * }}} + * @since 2.0.0 + */ + def mapValues[W: Encoder](func: V => W): KeyValueGroupedDataset[K, W] = { +val withNewData = AppendColumns(func, dataAttributes, logicalPlan) +val projected = Project(withNewData.newColumns ++ groupingAttributes, withNewData) +val executed = sparkSession.sessionState.executePlan(projected) + +new KeyValueGroupedDataset( + encoderFor[K], + encoderFor[W], + executed, + withNewData.newColumns, + groupingAttributes) + } + + /** + * Returns a new [[KeyValueGroupedDataset]] where the given function has been applied to the --- End diff -- ...with the given function `func` applied to the data? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13526: [SPARK-15780][SQL] Support mapValues on KeyValueG...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/13526#discussion_r69986245 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -65,6 +65,46 @@ class KeyValueGroupedDataset[K, V] private[sql]( groupingAttributes) /** + * Returns a new [[KeyValueGroupedDataset]] where the given function has been applied to the + * data. The grouping key is unchanged by this. + * + * {{{ + * // Create values grouped by key from a Dataset[(K, V)] + * ds.groupByKey(_._1).mapValues(_._2) // Scala + * }}} + * @since 2.0.0 + */ + def mapValues[W: Encoder](func: V => W): KeyValueGroupedDataset[K, W] = { --- End diff -- ...while here `W: Encoder` only after `:`. Why is this inconsistency? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13526: [SPARK-15780][SQL] Support mapValues on KeyValueG...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/13526#discussion_r69986179 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -175,6 +175,17 @@ object AppendColumns { encoderFor[U].namedExpressions, child) } + + def apply[T : Encoder, U : Encoder]( --- End diff -- Here you use `T : Encoder`, i.e. with spaces before and after `:` while... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14087: [SPARK-16411][SQL][STREAMING] Add textFile to Str...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14087#discussion_r69985379 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -331,6 +331,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("read from textfile") { +withTempDirs { case (src, tmp) => + val textStream = spark.readStream.textFile(src.getCanonicalPath) + val filtered = textStream.filter($"value" contains "keep") + + testStream(filtered)( +AddTextFileData("drop1\nkeep2\nkeep3", src, tmp), +CheckAnswer("keep2", "keep3"), +StopStream, +AddTextFileData("drop4\nkeep5\nkeep6", src, tmp), +StartStream(), --- End diff -- Just wondering why `()` are here while not for `StopStream`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14087: [SPARK-16411][SQL][STREAMING] Add textFile to Str...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14087#discussion_r69985100 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala --- @@ -281,6 +281,31 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo @Experimental def text(path: String): DataFrame = format("text").load(path) + /** + * Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset + * contains a single string column named "value". + * + * If the directory structure of the text files contains partitioning information, those are + * ignored in the resulting Dataset. To include partitioning information as columns, use `text`. + * + * Each line in the text files is a new element in the resulting Dataset. For example: + * {{{ + * // Scala: + * spark.read.textFile("/path/to/spark/README.md") + * + * // Java: + * spark.read().textFile("/path/to/spark/README.md") + * }}} + * + * @param path input path + * @since 2.0.0 + */ + def textFile(path: String): Dataset[String] = { +if (userSpecifiedSchema.nonEmpty) { + throw new AnalysisException("User specified schema not supported with `textFile`") +} + text(path).select("value").as[String](sparkSession.implicits.newStringEncoder) --- End diff -- I'm surprised that `sparkSession.implicits.newStringEncoder` is required here? Why is `sparkSession.implicits._` not imported here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14087: [SPARK-16411][SQL][STREAMING] Add textFile to Str...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14087#discussion_r69985212 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala --- @@ -281,6 +281,31 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo @Experimental def text(path: String): DataFrame = format("text").load(path) + /** + * Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset + * contains a single string column named "value". + * + * If the directory structure of the text files contains partitioning information, those are + * ignored in the resulting Dataset. To include partitioning information as columns, use `text`. + * + * Each line in the text files is a new element in the resulting Dataset. For example: + * {{{ + * // Scala: + * spark.read.textFile("/path/to/spark/README.md") + * + * // Java: + * spark.read().textFile("/path/to/spark/README.md") --- End diff -- s/read/readStream? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14087: [SPARK-16411][SQL][STREAMING] Add textFile to Str...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14087#discussion_r69985195 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala --- @@ -281,6 +281,31 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo @Experimental def text(path: String): DataFrame = format("text").load(path) + /** + * Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset + * contains a single string column named "value". + * + * If the directory structure of the text files contains partitioning information, those are + * ignored in the resulting Dataset. To include partitioning information as columns, use `text`. + * + * Each line in the text files is a new element in the resulting Dataset. For example: + * {{{ + * // Scala: + * spark.read.textFile("/path/to/spark/README.md") --- End diff -- s/read/readStream? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14087: [SPARK-16411][SQL][STREAMING] Add textFile to Str...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14087#discussion_r69984805 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala --- @@ -281,6 +281,31 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo @Experimental def text(path: String): DataFrame = format("text").load(path) + /** + * Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset + * contains a single string column named "value". + * + * If the directory structure of the text files contains partitioning information, those are + * ignored in the resulting Dataset. To include partitioning information as columns, use `text`. + * + * Each line in the text files is a new element in the resulting Dataset. For example: + * {{{ + * // Scala: + * spark.read.textFile("/path/to/spark/README.md") + * + * // Java: + * spark.read().textFile("/path/to/spark/README.md") + * }}} + * + * @param path input path + * @since 2.0.0 + */ + def textFile(path: String): Dataset[String] = { +if (userSpecifiedSchema.nonEmpty) { + throw new AnalysisException("User specified schema not supported with `textFile`") --- End diff -- user-specified --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14087: [SPARK-16411][SQL][STREAMING] Add textFile to Str...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14087#discussion_r69984678 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala --- @@ -281,6 +281,31 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo @Experimental def text(path: String): DataFrame = format("text").load(path) + /** + * Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset + * contains a single string column named "value". + * + * If the directory structure of the text files contains partitioning information, those are + * ignored in the resulting Dataset. To include partitioning information as columns, use `text`. + * + * Each line in the text files is a new element in the resulting Dataset. For example: --- End diff -- s/element/record? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14087: [SPARK-16411][SQL][STREAMING] Add textFile to Str...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14087#discussion_r69984584 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala --- @@ -281,6 +281,31 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo @Experimental def text(path: String): DataFrame = format("text").load(path) + /** + * Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset --- End diff -- a text file? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14083: [SPARK-16406][SQL] Improve performance of Logical...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14083#discussion_r69982767 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala --- @@ -165,111 +169,99 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { def resolveQuoted( name: String, resolver: Resolver): Option[NamedExpression] = { -resolve(UnresolvedAttribute.parseAttributeName(name), output, resolver) + outputAttributeResolver.resolve(UnresolvedAttribute.parseAttributeName(name), resolver) } /** - * Resolve the given `name` string against the given attribute, returning either 0 or 1 match. - * - * This assumes `name` has multiple parts, where the 1st part is a qualifier - * (i.e. table name, alias, or subquery alias). - * See the comment above `candidates` variable in resolve() for semantics the returned data. + * Refreshes (or invalidates) any metadata/data cached in the plan recursively. */ - private def resolveAsTableColumn( - nameParts: Seq[String], - resolver: Resolver, - attribute: Attribute): Option[(Attribute, List[String])] = { -assert(nameParts.length > 1) -if (attribute.qualifier.exists(resolver(_, nameParts.head))) { - // At least one qualifier matches. See if remaining parts match. - val remainingParts = nameParts.tail - resolveAsColumn(remainingParts, resolver, attribute) -} else { - None -} + def refresh(): Unit = children.foreach(_.refresh()) +} + +/** + * Helper class for (LogicalPlan) attribute resolution. This class indexes attributes by their + * case-in-sensitive name, and checks potential candidates using the given Resolver. Both qualified --- End diff -- case-insensitive? When you say "the given Resolver", what do you mean by "Resolver"? Can we link to the type? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14080: [SPARK-16405] Add metrics and source for external...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14080#discussion_r69981791 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java --- @@ -143,4 +179,26 @@ private void checkAuth(TransportClient client, String appId) { } } + /** + * A simple class to wrap all shuffle service wrapper metrics + */ + private class ShuffleMetrics implements MetricSet { +private final Map allMetrics; +private final Timer timeDelayForOpenBlockRequest = new Timer(); +private final Timer timeDelayForRegisterExecutorRequest = new Timer(); +private final Meter transferBlockRate = new Meter(); + +private ShuffleMetrics() { + allMetrics = new HashMap<>(); --- End diff -- Will it work with Java 7? I think Spark 2.0 will keep support for the version. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13981: [SPARK-16307] [ML] Add test to verify the predict...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/13981#discussion_r69703134 --- Diff: mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala --- @@ -96,6 +97,25 @@ class DecisionTreeRegressorSuite assert(variance === expectedVariance, s"Expected variance $expectedVariance but got $variance.") } + +val varianceData: RDD[LabeledPoint] = TreeTests.varianceData(sc) +val varianceDF = TreeTests.setMetadata(varianceData, Map.empty[Int, Int], 0) +dt.setMaxDepth(1) + .setMaxBins(6) + .setSeed(0) +val transformVarDF = dt.fit(varianceDF).transform(varianceDF) +val calculatedVariances = transformVarDF.select(dt.getVarianceCol).collect().map { --- End diff -- Before `collect` so you work with type-safe `Dataset` (not a `DataFrame` which is `Dataset[Row]`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13981: [SPARK-16307] [ML] Add test to verify the predict...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/13981#discussion_r69631781 --- Diff: mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala --- @@ -96,6 +97,25 @@ class DecisionTreeRegressorSuite assert(variance === expectedVariance, s"Expected variance $expectedVariance but got $variance.") } + +val varianceData: RDD[LabeledPoint] = TreeTests.varianceData(sc) +val varianceDF = TreeTests.setMetadata(varianceData, Map.empty[Int, Int], 0) +dt.setMaxDepth(1) + .setMaxBins(6) + .setSeed(0) +val transformVarDF = dt.fit(varianceDF).transform(varianceDF) +val calculatedVariances = transformVarDF.select(dt.getVarianceCol).collect().map { --- End diff -- I wonder if `toDS` worked here and you'd have `variance` "simpler". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14034: [SPARK-16355] [SPARK-16354] [SQL] Fix Bugs When L...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14034#discussion_r69528969 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala --- @@ -31,4 +33,46 @@ class StatisticsSuite extends QueryTest with SharedSQLContext { spark.sessionState.conf.autoBroadcastJoinThreshold) } + test("estimates the size of limit") { +withTempTable("test") { + Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v") +.createOrReplaceTempView("test") + Seq((0, 1), (1, 24), (2, 48)).foreach { case (limit, expected) => +val df = sql(s"""SELECT * FROM test limit $limit""") + +val sizesGlobalLimit = df.queryExecution.analyzed.collect { case g: GlobalLimit => + g.statistics.sizeInBytes +} +assert(sizesGlobalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}") +assert(sizesGlobalLimit(0).equals(BigInt(expected)), + s"expected exact size 24 for table 'test', got: ${sizesGlobalLimit(0)}") + +val sizesLocalLimit = df.queryExecution.analyzed.collect { case l: LocalLimit => + l.statistics.sizeInBytes +} +assert(sizesLocalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}") +assert(sizesLocalLimit(0).equals(BigInt(expected)), + s"expected exact size 24 for table 'test', got: ${sizesLocalLimit(0)}") + } +} + } + + test("estimates the size of a limit 0 on outer join") { +withTempTable("test") { + Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v") +.createOrReplaceTempView("test") + val df1 = spark.table("test") + val df2 = spark.table("test").limit(0) + val df = df1.join(df2, Seq("k"), "left") + + val sizes = df.queryExecution.analyzed.collect { case g: Join => +g.statistics.sizeInBytes + } + + assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}") + assert(sizes(0).equals(BigInt(96)), --- End diff -- Why do you `equals`? Would `===` not work here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14047: [SPARK-16368] [SQL] Fix Strange Errors When Creat...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14047#discussion_r69527187 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -391,6 +391,29 @@ class HiveDDLSuite } } + test("create view with mismatched schema") { --- End diff -- Yes. You're right. I must've overlooked this no-so-little difference. Is `temporary` the only difference between the tests? Could we have a template test method to generate proper versions for the case? (even if it were too much for this change, it'd pave the way for more tests like this in the future). We could also have a follow-up change after this one is committed. WDYT @srowen @rxin? Is this worth the effort? I'm concerned with this duplication. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14034: [SPARK-16355] [SPARK-16354] [SQL] Fix Bugs When L...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14034#discussion_r69488260 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -251,6 +251,22 @@ trait CheckAnalysis extends PredicateHelper { s"but one table has '${firstError.output.length}' columns and another table has " + s"'${s.children.head.output.length}' columns") + case l: GlobalLimit => +val numRows = l.limitExpr.eval().asInstanceOf[Int] +if (numRows < 0) { + failAnalysis( +s"number_rows in limit clause must be equal to or greater than 0. " + + s"number_rows:$numRows") +} + + case l: LocalLimit => --- End diff -- What do you think about merging the two cases to `case l @ (_: LocalLimit | _: GlobalLimit) =>` to remove the duplication (or at least introduce a local method). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14047: [SPARK-16368] [SQL] Fix Strange Errors When Creat...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14047#discussion_r69487586 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -391,6 +391,29 @@ class HiveDDLSuite } } + test("create view with mismatched schema") { --- End diff -- Did you copy the tests? Could you file a JIRA issue to get rid of the duplication at the very least? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14035#discussion_r69432798 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala --- @@ -55,7 +56,7 @@ class LogisticRegressionSuite generateMultinomialLogisticInput(coefficients, xMean, xVariance, addIntercept = true, nPoints, 42) - spark.createDataFrame(sc.parallelize(testData, 4)) + sc.parallelize(testData, 4).toDF() --- End diff -- It'd be nice to know what was the purpose of the explicit partition setting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14035#discussion_r69391176 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala --- @@ -282,9 +281,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { val z = Vectors.dense(4.0).asML val p = (5.0, z) val w = Vectors.dense(6.0) -val df = spark.createDataFrame(Seq( - (0, x, y, p, w) -)).toDF("id", "x", "y", "p", "w") +val df = Seq((0, x, y, p, w)).toDF("id", "x", "y", "p", "w") .withColumn("x", col("x"), metadata) --- End diff -- Replace `col("x")` with `$"x"` or (better) `'x` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14035#discussion_r69391151 --- Diff: mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala --- @@ -52,23 +53,20 @@ class GeneralizedLinearRegressionSuite import GeneralizedLinearRegressionSuite._ -datasetGaussianIdentity = spark.createDataFrame( - sc.parallelize(generateGeneralizedLinearRegressionInput( -intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), -xVariance = Array(0.7, 1.2), nPoints = 1, seed, noiseLevel = 0.01, -family = "gaussian", link = "identity"), 2)) +datasetGaussianIdentity = sc.parallelize(generateGeneralizedLinearRegressionInput( --- End diff -- Why is this `sc.parallelize` needed here? Why are `2` partitions used? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14035#discussion_r69391139 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala --- @@ -102,7 +103,7 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext } test("Cannot fit an empty DataFrame") { -val rdd = spark.createDataFrame(sc.parallelize(Array.empty[Vector], 2).map(FeatureData)) +val rdd = sc.parallelize(Array.empty[Vector], 2).map(FeatureData).toDF() --- End diff -- Do you need `sc.parallelize`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14035#discussion_r69391120 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala --- @@ -39,7 +40,7 @@ class StringIndexerSuite test("StringIndexer") { val data = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")), 2) --- End diff -- Could you remove `sc.parallelize`, too? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14035#discussion_r69390423 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala --- @@ -29,10 +29,11 @@ import org.apache.spark.sql.types._ class OneHotEncoderSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { + import testImplicits._ def stringIndexed(): DataFrame = { val data = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")), 2) --- End diff -- Remove `sc.parallelize`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14035#discussion_r69390388 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala --- @@ -61,7 +62,7 @@ class NormalizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa Vectors.sparse(3, Seq()) ) -dataFrame = spark.createDataFrame(sc.parallelize(data, 2).map(NormalizerSuite.FeatureData)) +dataFrame = sc.parallelize(data, 2).map(NormalizerSuite.FeatureData).toDF() --- End diff -- Remove `sc.parallelize` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14035#discussion_r69390273 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/MinMaxScalerSuite.scala --- @@ -57,8 +58,7 @@ class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De test("MinMaxScaler arguments max must be larger than min") { withClue("arguments max must be larger than min") { - val dummyDF = spark.createDataFrame(Seq( -(1, Vectors.dense(1.0, 2.0.toDF("id", "feature") + val dummyDF = Seq((1, Vectors.dense(1.0, 2.0))).toDF("id", "feature") --- End diff -- It's just a column name, but for consistency...`features` (not `feature`) (unless there's a reason for this) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14035#discussion_r69390216 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/CountVectorizerSuite.scala --- @@ -44,7 +45,7 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext (3, split(""), Vectors.sparse(4, Seq())), // empty string --- End diff -- Replace the comment `// empty string` with `val EMPTY_STRING = ""` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14035#discussion_r69390204 --- Diff: mllib/src/test/scala/org/apache/spark/ml/evaluation/RegressionEvaluatorSuite.scala --- @@ -42,9 +43,10 @@ class RegressionEvaluatorSuite * data.map(x=> x.label + ", " + x.features(0) + ", " + x.features(1)) * .saveAsTextFile("path") */ -val dataset = spark.createDataFrame( - sc.parallelize(LinearDataGenerator.generateLinearInput( -6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 0.1), 2).map(_.asML)) +val dataset = sc.parallelize( --- End diff -- Remove `sc.parallelize` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14035#discussion_r69390189 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala --- @@ -158,7 +159,7 @@ class RandomForestClassifierSuite } test("Fitting without numClasses in metadata") { -val df: DataFrame = spark.createDataFrame(TreeTests.featureImportanceData(sc)) +val df: DataFrame = TreeTests.featureImportanceData(sc).toDF() --- End diff -- Why is the type annotation needed here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14035#discussion_r69390185 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala --- @@ -55,7 +56,7 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau val xVariance = Array(0.6856, 0.1899, 3.116, 0.581) rdd = sc.parallelize(generateMultinomialLogisticInput( coefficients, xMean, xVariance, true, nPoints, 42), 2) -dataset = spark.createDataFrame(rdd) +dataset = rdd.toDF() --- End diff -- Merge it with line 57. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14035#discussion_r69390178 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala --- @@ -47,7 +48,7 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa Array(0.10, 0.10, 0.70, 0.10) // label 2 ).map(_.map(math.log)) -dataset = spark.createDataFrame(generateNaiveBayesInput(pi, theta, 100, 42)) +dataset = generateNaiveBayesInput(pi, theta, 100, 42).toDF() --- End diff -- Exactly my point above :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14035#discussion_r69390173 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala --- @@ -116,7 +117,7 @@ class MultilayerPerceptronClassifierSuite // the input seed is somewhat magic, to make this test pass val rdd = sc.parallelize(generateMultinomialLogisticInput( coefficients, xMean, xVariance, true, nPoints, 1), 2) -val dataFrame = spark.createDataFrame(rdd).toDF("label", "features") +val dataFrame = rdd.toDF("label", "features") --- End diff -- Could we merge this line with 118? I don't think 118 needs `sc.parallelize`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/14035#discussion_r69390144 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala --- @@ -55,7 +56,7 @@ class LogisticRegressionSuite generateMultinomialLogisticInput(coefficients, xMean, xVariance, addIntercept = true, nPoints, 42) - spark.createDataFrame(sc.parallelize(testData, 4)) + sc.parallelize(testData, 4).toDF() --- End diff -- `testData.toDF.repartition(4)`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org