[GitHub] spark pull request #14414: [SPARK-16809] enable history server links in disp...

2016-08-01 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14414#discussion_r73032820
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -364,7 +377,12 @@ private[spark] class MesosClusterScheduler(
   val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)
   val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.")
 
-  driverEnv ++ executorEnv ++ desc.command.environment
+  var commandEnv = adjust(desc.command.environment, 
"SPARK_SUBMIT_OPTS", "")(
+v => s"$v 
-Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
+  )
+
+
--- End diff --

Remove the new line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14414: [SPARK-16809] enable history server links in disp...

2016-08-01 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14414#discussion_r73032511
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala ---
@@ -28,10 +28,17 @@ import 
org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState
 import org.apache.spark.ui.{UIUtils, WebUIPage}
 
 private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends 
WebUIPage("") {
+  private val historyServerURL = 
parent.conf.getOption("spark.mesos.dispatcher.historyServer.url")
+
   def render(request: HttpServletRequest): Seq[Node] = {
 val state = parent.scheduler.getSchedulerState()
-val queuedHeaders = Seq("Driver ID", "Submit Date", "Main Class", 
"Driver Resources")
-val driverHeaders = queuedHeaders ++
+
+val driverHeader = Seq("Driver ID")
--- End diff --

Why is a header a Seq?!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14414: [SPARK-16809] enable history server links in disp...

2016-08-01 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14414#discussion_r73032355
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala ---
@@ -28,10 +28,17 @@ import 
org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState
 import org.apache.spark.ui.{UIUtils, WebUIPage}
 
 private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends 
WebUIPage("") {
+  private val historyServerURL = 
parent.conf.getOption("spark.mesos.dispatcher.historyServer.url")
+
   def render(request: HttpServletRequest): Seq[Node] = {
 val state = parent.scheduler.getSchedulerState()
-val queuedHeaders = Seq("Driver ID", "Submit Date", "Main Class", 
"Driver Resources")
-val driverHeaders = queuedHeaders ++
+
+val driverHeader = Seq("Driver ID")
+val historyHeader = if (historyServerURL.isDefined) Seq("History") 
else Nil
--- End diff --

`map.getOrElse`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14414: [SPARK-16809] enable history server links in disp...

2016-08-01 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14414#discussion_r73032190
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala ---
@@ -28,10 +28,17 @@ import 
org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState
 import org.apache.spark.ui.{UIUtils, WebUIPage}
 
 private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends 
WebUIPage("") {
+  private val historyServerURL = 
parent.conf.getOption("spark.mesos.dispatcher.historyServer.url")
--- End diff --

Would it be possible to define a constant for 
"spark.mesos.dispatcher.historyServer.url" (as is in SQL and YARN modules)? See 
https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...

2016-07-25 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14292#discussion_r72148282
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala ---
@@ -247,6 +248,46 @@ private[sql] trait SQLTestUtils
   }
 }
   }
+
+  /** Run a test on a separate [[UninterruptibleThread]]. */
+  protected def testWithUninterruptibleThread(name: String, quietly: 
Boolean = false)
+(body: => Unit): Unit = {
+val timeoutMillis = 1
+var ex: Throwable = null
+
+def runOnThread(): Unit = {
+  val thread = new UninterruptibleThread(s"Testing thread for test 
$name") {
+override def run(): Unit = {
+  try {
+body
+  } catch {
+case NonFatal(e) =>
+  ex = e
+  }
+}
+  }
+  thread.setDaemon(true)
+  thread.start()
+  thread.join(timeoutMillis)
+  if (thread.isAlive) {
+thread.interrupt()
+// If this interrupt does not work, then this thread is most 
likely running something that
+// is not interruptible. There is not much point to wait for the 
thread to termniate, and
+// we rather let the JVM terminate the thread on exit.
+fail(
+  s"Test '$name' running on o.a.s.util.UninterruptibleThread timed 
out after" +
+s" $timeoutMillis ms")
+  } else if (ex != null) {
+throw ex
+  }
+}
+
+if (quietly) {
--- End diff --

What about this?

```
val f = if (quietly) testQuietly(name) else test(name)
f(runOnThread())
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...

2016-07-25 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14292#discussion_r72148009
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -269,19 +273,11 @@ class StreamExecution(
* batchId counter is incremented and a new log entry is written with 
the newest offsets.
*/
   private def constructNextBatch(): Unit = {
-// There is a potential dead-lock in Hadoop "Shell.runCommand" before 
2.5.0 (HADOOP-10622).
-// If we interrupt some thread running Shell.runCommand, we may hit 
this issue.
-// As "FileStreamSource.getOffset" will create a file using HDFS API 
and call "Shell.runCommand"
-// to set the file permission, we should not interrupt 
"microBatchThread" when running this
-// method. See SPARK-14131.
-//
 // Check to see what new data is available.
 val hasNewData = {
   awaitBatchLock.lock()
   try {
-val newData = microBatchThread.runUninterruptibly {
-  uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
-}
+val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> 
o))
--- End diff --

Gave it a longer thought. I'm not using for comprehension very often, but 
when I do...What do you think about this?

```
val newData = for {
  source <- uniqueSources
  offset <- source.getOffset
} yield (source, offset)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...

2016-07-25 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14292#discussion_r72137376
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -269,19 +273,11 @@ class StreamExecution(
* batchId counter is incremented and a new log entry is written with 
the newest offsets.
*/
   private def constructNextBatch(): Unit = {
-// There is a potential dead-lock in Hadoop "Shell.runCommand" before 
2.5.0 (HADOOP-10622).
-// If we interrupt some thread running Shell.runCommand, we may hit 
this issue.
-// As "FileStreamSource.getOffset" will create a file using HDFS API 
and call "Shell.runCommand"
-// to set the file permission, we should not interrupt 
"microBatchThread" when running this
-// method. See SPARK-14131.
-//
 // Check to see what new data is available.
 val hasNewData = {
   awaitBatchLock.lock()
   try {
-val newData = microBatchThread.runUninterruptibly {
-  uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
-}
+val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> 
o))
--- End diff --

I don't like it either, but...couldn't it be that the line is trying to do 
more than it really should? Perhaps the code should be two simpler functions 
composed? Just a wild thought...Don't wanna hold it back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14322: [SPARK-16689] [SQL] FileSourceStrategy: Pruning P...

2016-07-24 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14322#discussion_r71992661
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -135,9 +135,17 @@ private[sql] object FileSourceStrategy extends 
Strategy with Logging {
 PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"),
 INPUT_PATHS -> fsRelation.location.paths.mkString(", "))
 
+  // If the required attributes does not have the partitioning 
columns, we do not need
+  // to scan the partitioning columns. If partitioning columns are 
selected, the column order
+  // of partitionColumns is fixed in rdd. Thus, we always scan all the 
partitioning columns.
+  val scannedColumns = if 
(requiredAttributes.intersect(partitionSet).nonEmpty) {
--- End diff --

What do you think about `map` and `getOrElse`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...

2016-07-24 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14326#discussion_r71992598
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala ---
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.regression
+
+import scala.collection.mutable
+
+import breeze.linalg.{DenseVector => BDV}
+import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => 
BreezeLBFGS, LBFGSB => BreezeLBFGSB}
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.Since
+import org.apache.spark.internal.Logging
+import org.apache.spark.ml.PredictorParams
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{Vector, Vectors}
+import org.apache.spark.ml.linalg.BLAS._
+import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators}
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Params for robust regression.
+ */
+private[regression] trait RobustRegressionParams extends PredictorParams 
with HasRegParam
+  with HasMaxIter with HasTol with HasFitIntercept with HasStandardization 
with HasWeightCol {
+
+  /**
+   * The shape parameter to control the amount of robustness. Must be > 
1.0.
+   * At larger values of M, the huber criterion becomes more similar to 
least squares regression;
+   * for small values of M, the criterion is more similar to L1 regression.
+   * Default is 1.35 to get as much robustness as possible while retaining
+   * 95% statistical efficiency for normally distributed data.
+   */
+  @Since("2.1.0")
+  final val m = new DoubleParam(this, "m", "The shape parameter to control 
the amount of " +
+"robustness. Must be > 1.0.", ParamValidators.gt(1.0))
+
+  /** @group getParam */
+  @Since("2.1.0")
+  def getM: Double = $(m)
+}
+
+/**
+ * Robust regression.
+ *
+ * The learning objective is to minimize the huber loss, with 
regularization.
+ *
+ * The robust regression optimizes the squared loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}}
+ * and the absolute loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}},
+ * where \beta and \sigma are parameters to be optimized.
+ *
+ * This supports two types of regularization: None and L2.
+ *
+ * This estimator is different from the R implementation of Robust 
Regression
+ * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R 
implementation does a
+ * weighted least squares implementation with weights given to each sample 
on the basis
+ * of how much the residual is greater than a certain threshold.
+ */
+@Since("2.1.0")
+class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: 
String)
+  extends Regressor[Vector, RobustRegression, RobustRegressionModel]
+  with RobustRegressionParams with Logging {
+
+  @Since("2.1.0")
+  def this() = this(Identifiable.randomUID("robReg"))
+
+  /**
+   * Sets the value of param [[m]].
+   * Default is 1.35.
+   * @group setParam
+   */
+  @Since("2.1.0")
+  def setM(value: Double): this.type = set(m, value)
+  setDefault(m -> 1.35)
+
+  /**
+   * Sets the regularization parameter.
+   * Default is 0.0.
+   * @group setParam
+   */
+  @Since("2.1.0")
+  def setRegParam(value: Double): this.type = set(regParam, 

[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...

2016-07-24 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14326#discussion_r71992548
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala ---
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.regression
+
+import scala.collection.mutable
+
+import breeze.linalg.{DenseVector => BDV}
+import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => 
BreezeLBFGS, LBFGSB => BreezeLBFGSB}
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.Since
+import org.apache.spark.internal.Logging
+import org.apache.spark.ml.PredictorParams
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{Vector, Vectors}
+import org.apache.spark.ml.linalg.BLAS._
+import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators}
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Params for robust regression.
+ */
+private[regression] trait RobustRegressionParams extends PredictorParams 
with HasRegParam
+  with HasMaxIter with HasTol with HasFitIntercept with HasStandardization 
with HasWeightCol {
+
+  /**
+   * The shape parameter to control the amount of robustness. Must be > 
1.0.
+   * At larger values of M, the huber criterion becomes more similar to 
least squares regression;
+   * for small values of M, the criterion is more similar to L1 regression.
+   * Default is 1.35 to get as much robustness as possible while retaining
+   * 95% statistical efficiency for normally distributed data.
+   */
+  @Since("2.1.0")
+  final val m = new DoubleParam(this, "m", "The shape parameter to control 
the amount of " +
+"robustness. Must be > 1.0.", ParamValidators.gt(1.0))
+
+  /** @group getParam */
+  @Since("2.1.0")
+  def getM: Double = $(m)
+}
+
+/**
+ * Robust regression.
+ *
+ * The learning objective is to minimize the huber loss, with 
regularization.
+ *
+ * The robust regression optimizes the squared loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}}
+ * and the absolute loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}},
+ * where \beta and \sigma are parameters to be optimized.
+ *
+ * This supports two types of regularization: None and L2.
+ *
+ * This estimator is different from the R implementation of Robust 
Regression
+ * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R 
implementation does a
+ * weighted least squares implementation with weights given to each sample 
on the basis
+ * of how much the residual is greater than a certain threshold.
+ */
+@Since("2.1.0")
+class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: 
String)
+  extends Regressor[Vector, RobustRegression, RobustRegressionModel]
+  with RobustRegressionParams with Logging {
+
+  @Since("2.1.0")
+  def this() = this(Identifiable.randomUID("robReg"))
+
+  /**
+   * Sets the value of param [[m]].
+   * Default is 1.35.
+   * @group setParam
+   */
+  @Since("2.1.0")
+  def setM(value: Double): this.type = set(m, value)
+  setDefault(m -> 1.35)
+
+  /**
+   * Sets the regularization parameter.
+   * Default is 0.0.
+   * @group setParam
+   */
+  @Since("2.1.0")
+  def setRegParam(value: Double): this.type = set(regParam, 

[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...

2016-07-24 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14326#discussion_r71992524
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala ---
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.regression
+
+import scala.collection.mutable
+
+import breeze.linalg.{DenseVector => BDV}
+import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => 
BreezeLBFGS, LBFGSB => BreezeLBFGSB}
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.Since
+import org.apache.spark.internal.Logging
+import org.apache.spark.ml.PredictorParams
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{Vector, Vectors}
+import org.apache.spark.ml.linalg.BLAS._
+import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators}
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Params for robust regression.
+ */
+private[regression] trait RobustRegressionParams extends PredictorParams 
with HasRegParam
+  with HasMaxIter with HasTol with HasFitIntercept with HasStandardization 
with HasWeightCol {
+
+  /**
+   * The shape parameter to control the amount of robustness. Must be > 
1.0.
+   * At larger values of M, the huber criterion becomes more similar to 
least squares regression;
+   * for small values of M, the criterion is more similar to L1 regression.
+   * Default is 1.35 to get as much robustness as possible while retaining
+   * 95% statistical efficiency for normally distributed data.
+   */
+  @Since("2.1.0")
+  final val m = new DoubleParam(this, "m", "The shape parameter to control 
the amount of " +
+"robustness. Must be > 1.0.", ParamValidators.gt(1.0))
+
+  /** @group getParam */
+  @Since("2.1.0")
+  def getM: Double = $(m)
+}
+
+/**
+ * Robust regression.
+ *
+ * The learning objective is to minimize the huber loss, with 
regularization.
+ *
+ * The robust regression optimizes the squared loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}}
+ * and the absolute loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}},
+ * where \beta and \sigma are parameters to be optimized.
+ *
+ * This supports two types of regularization: None and L2.
+ *
+ * This estimator is different from the R implementation of Robust 
Regression
+ * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R 
implementation does a
+ * weighted least squares implementation with weights given to each sample 
on the basis
+ * of how much the residual is greater than a certain threshold.
+ */
+@Since("2.1.0")
+class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: 
String)
+  extends Regressor[Vector, RobustRegression, RobustRegressionModel]
+  with RobustRegressionParams with Logging {
+
+  @Since("2.1.0")
+  def this() = this(Identifiable.randomUID("robReg"))
+
+  /**
+   * Sets the value of param [[m]].
+   * Default is 1.35.
+   * @group setParam
+   */
+  @Since("2.1.0")
+  def setM(value: Double): this.type = set(m, value)
+  setDefault(m -> 1.35)
+
+  /**
+   * Sets the regularization parameter.
+   * Default is 0.0.
+   * @group setParam
+   */
+  @Since("2.1.0")
+  def setRegParam(value: Double): this.type = set(regParam, 

[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...

2016-07-24 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14326#discussion_r71992526
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala ---
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.regression
+
+import scala.collection.mutable
+
+import breeze.linalg.{DenseVector => BDV}
+import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => 
BreezeLBFGS, LBFGSB => BreezeLBFGSB}
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.Since
+import org.apache.spark.internal.Logging
+import org.apache.spark.ml.PredictorParams
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{Vector, Vectors}
+import org.apache.spark.ml.linalg.BLAS._
+import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators}
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Params for robust regression.
+ */
+private[regression] trait RobustRegressionParams extends PredictorParams 
with HasRegParam
+  with HasMaxIter with HasTol with HasFitIntercept with HasStandardization 
with HasWeightCol {
+
+  /**
+   * The shape parameter to control the amount of robustness. Must be > 
1.0.
+   * At larger values of M, the huber criterion becomes more similar to 
least squares regression;
+   * for small values of M, the criterion is more similar to L1 regression.
+   * Default is 1.35 to get as much robustness as possible while retaining
+   * 95% statistical efficiency for normally distributed data.
+   */
+  @Since("2.1.0")
+  final val m = new DoubleParam(this, "m", "The shape parameter to control 
the amount of " +
+"robustness. Must be > 1.0.", ParamValidators.gt(1.0))
+
+  /** @group getParam */
+  @Since("2.1.0")
+  def getM: Double = $(m)
+}
+
+/**
+ * Robust regression.
+ *
+ * The learning objective is to minimize the huber loss, with 
regularization.
+ *
+ * The robust regression optimizes the squared loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}}
+ * and the absolute loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}},
+ * where \beta and \sigma are parameters to be optimized.
+ *
+ * This supports two types of regularization: None and L2.
+ *
+ * This estimator is different from the R implementation of Robust 
Regression
+ * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R 
implementation does a
+ * weighted least squares implementation with weights given to each sample 
on the basis
+ * of how much the residual is greater than a certain threshold.
+ */
+@Since("2.1.0")
+class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: 
String)
+  extends Regressor[Vector, RobustRegression, RobustRegressionModel]
+  with RobustRegressionParams with Logging {
+
+  @Since("2.1.0")
+  def this() = this(Identifiable.randomUID("robReg"))
+
+  /**
+   * Sets the value of param [[m]].
+   * Default is 1.35.
+   * @group setParam
+   */
+  @Since("2.1.0")
+  def setM(value: Double): this.type = set(m, value)
+  setDefault(m -> 1.35)
+
+  /**
+   * Sets the regularization parameter.
+   * Default is 0.0.
+   * @group setParam
+   */
+  @Since("2.1.0")
+  def setRegParam(value: Double): this.type = set(regParam, 

[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...

2016-07-24 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14326#discussion_r71992494
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala ---
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.regression
+
+import scala.collection.mutable
+
+import breeze.linalg.{DenseVector => BDV}
+import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => 
BreezeLBFGS, LBFGSB => BreezeLBFGSB}
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.Since
+import org.apache.spark.internal.Logging
+import org.apache.spark.ml.PredictorParams
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{Vector, Vectors}
+import org.apache.spark.ml.linalg.BLAS._
+import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators}
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Params for robust regression.
+ */
+private[regression] trait RobustRegressionParams extends PredictorParams 
with HasRegParam
+  with HasMaxIter with HasTol with HasFitIntercept with HasStandardization 
with HasWeightCol {
+
+  /**
+   * The shape parameter to control the amount of robustness. Must be > 
1.0.
+   * At larger values of M, the huber criterion becomes more similar to 
least squares regression;
+   * for small values of M, the criterion is more similar to L1 regression.
+   * Default is 1.35 to get as much robustness as possible while retaining
+   * 95% statistical efficiency for normally distributed data.
+   */
+  @Since("2.1.0")
+  final val m = new DoubleParam(this, "m", "The shape parameter to control 
the amount of " +
+"robustness. Must be > 1.0.", ParamValidators.gt(1.0))
+
+  /** @group getParam */
+  @Since("2.1.0")
+  def getM: Double = $(m)
+}
+
+/**
+ * Robust regression.
+ *
+ * The learning objective is to minimize the huber loss, with 
regularization.
+ *
+ * The robust regression optimizes the squared loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}}
+ * and the absolute loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}},
+ * where \beta and \sigma are parameters to be optimized.
+ *
+ * This supports two types of regularization: None and L2.
+ *
+ * This estimator is different from the R implementation of Robust 
Regression
+ * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R 
implementation does a
+ * weighted least squares implementation with weights given to each sample 
on the basis
+ * of how much the residual is greater than a certain threshold.
+ */
+@Since("2.1.0")
+class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: 
String)
+  extends Regressor[Vector, RobustRegression, RobustRegressionModel]
+  with RobustRegressionParams with Logging {
+
+  @Since("2.1.0")
+  def this() = this(Identifiable.randomUID("robReg"))
+
+  /**
+   * Sets the value of param [[m]].
+   * Default is 1.35.
+   * @group setParam
+   */
+  @Since("2.1.0")
+  def setM(value: Double): this.type = set(m, value)
+  setDefault(m -> 1.35)
+
+  /**
+   * Sets the regularization parameter.
+   * Default is 0.0.
+   * @group setParam
+   */
+  @Since("2.1.0")
+  def setRegParam(value: Double): this.type = set(regParam, 

[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...

2016-07-24 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14326#discussion_r71992474
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala ---
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.regression
+
+import scala.collection.mutable
+
+import breeze.linalg.{DenseVector => BDV}
+import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => 
BreezeLBFGS, LBFGSB => BreezeLBFGSB}
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.Since
+import org.apache.spark.internal.Logging
+import org.apache.spark.ml.PredictorParams
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{Vector, Vectors}
+import org.apache.spark.ml.linalg.BLAS._
+import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators}
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Params for robust regression.
+ */
+private[regression] trait RobustRegressionParams extends PredictorParams 
with HasRegParam
+  with HasMaxIter with HasTol with HasFitIntercept with HasStandardization 
with HasWeightCol {
+
+  /**
+   * The shape parameter to control the amount of robustness. Must be > 
1.0.
+   * At larger values of M, the huber criterion becomes more similar to 
least squares regression;
+   * for small values of M, the criterion is more similar to L1 regression.
+   * Default is 1.35 to get as much robustness as possible while retaining
+   * 95% statistical efficiency for normally distributed data.
+   */
+  @Since("2.1.0")
+  final val m = new DoubleParam(this, "m", "The shape parameter to control 
the amount of " +
+"robustness. Must be > 1.0.", ParamValidators.gt(1.0))
+
+  /** @group getParam */
+  @Since("2.1.0")
+  def getM: Double = $(m)
+}
+
+/**
+ * Robust regression.
+ *
+ * The learning objective is to minimize the huber loss, with 
regularization.
+ *
+ * The robust regression optimizes the squared loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}}
+ * and the absolute loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}},
+ * where \beta and \sigma are parameters to be optimized.
+ *
+ * This supports two types of regularization: None and L2.
+ *
+ * This estimator is different from the R implementation of Robust 
Regression
+ * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R 
implementation does a
+ * weighted least squares implementation with weights given to each sample 
on the basis
+ * of how much the residual is greater than a certain threshold.
+ */
+@Since("2.1.0")
+class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: 
String)
+  extends Regressor[Vector, RobustRegression, RobustRegressionModel]
+  with RobustRegressionParams with Logging {
+
+  @Since("2.1.0")
+  def this() = this(Identifiable.randomUID("robReg"))
+
+  /**
+   * Sets the value of param [[m]].
+   * Default is 1.35.
+   * @group setParam
+   */
+  @Since("2.1.0")
+  def setM(value: Double): this.type = set(m, value)
+  setDefault(m -> 1.35)
+
+  /**
+   * Sets the regularization parameter.
+   * Default is 0.0.
+   * @group setParam
+   */
+  @Since("2.1.0")
+  def setRegParam(value: Double): this.type = set(regParam, 

[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...

2016-07-24 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14326#discussion_r71992412
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala ---
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.regression
+
+import scala.collection.mutable
+
+import breeze.linalg.{DenseVector => BDV}
+import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => 
BreezeLBFGS, LBFGSB => BreezeLBFGSB}
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.Since
+import org.apache.spark.internal.Logging
+import org.apache.spark.ml.PredictorParams
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{Vector, Vectors}
+import org.apache.spark.ml.linalg.BLAS._
+import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators}
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Params for robust regression.
+ */
+private[regression] trait RobustRegressionParams extends PredictorParams 
with HasRegParam
+  with HasMaxIter with HasTol with HasFitIntercept with HasStandardization 
with HasWeightCol {
+
+  /**
+   * The shape parameter to control the amount of robustness. Must be > 
1.0.
+   * At larger values of M, the huber criterion becomes more similar to 
least squares regression;
+   * for small values of M, the criterion is more similar to L1 regression.
+   * Default is 1.35 to get as much robustness as possible while retaining
+   * 95% statistical efficiency for normally distributed data.
+   */
+  @Since("2.1.0")
+  final val m = new DoubleParam(this, "m", "The shape parameter to control 
the amount of " +
+"robustness. Must be > 1.0.", ParamValidators.gt(1.0))
+
+  /** @group getParam */
+  @Since("2.1.0")
+  def getM: Double = $(m)
+}
+
+/**
+ * Robust regression.
+ *
+ * The learning objective is to minimize the huber loss, with 
regularization.
+ *
+ * The robust regression optimizes the squared loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}}
+ * and the absolute loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}},
+ * where \beta and \sigma are parameters to be optimized.
+ *
+ * This supports two types of regularization: None and L2.
+ *
+ * This estimator is different from the R implementation of Robust 
Regression
+ * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R 
implementation does a
+ * weighted least squares implementation with weights given to each sample 
on the basis
+ * of how much the residual is greater than a certain threshold.
+ */
+@Since("2.1.0")
+class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: 
String)
+  extends Regressor[Vector, RobustRegression, RobustRegressionModel]
+  with RobustRegressionParams with Logging {
+
+  @Since("2.1.0")
+  def this() = this(Identifiable.randomUID("robReg"))
+
+  /**
+   * Sets the value of param [[m]].
+   * Default is 1.35.
+   * @group setParam
+   */
+  @Since("2.1.0")
+  def setM(value: Double): this.type = set(m, value)
+  setDefault(m -> 1.35)
+
+  /**
+   * Sets the regularization parameter.
+   * Default is 0.0.
+   * @group setParam
+   */
+  @Since("2.1.0")
+  def setRegParam(value: Double): this.type = set(regParam, 

[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...

2016-07-24 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14326#discussion_r71992373
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala ---
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.regression
+
+import scala.collection.mutable
+
+import breeze.linalg.{DenseVector => BDV}
+import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => 
BreezeLBFGS, LBFGSB => BreezeLBFGSB}
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.Since
+import org.apache.spark.internal.Logging
+import org.apache.spark.ml.PredictorParams
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{Vector, Vectors}
+import org.apache.spark.ml.linalg.BLAS._
+import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators}
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Params for robust regression.
+ */
+private[regression] trait RobustRegressionParams extends PredictorParams 
with HasRegParam
+  with HasMaxIter with HasTol with HasFitIntercept with HasStandardization 
with HasWeightCol {
+
+  /**
+   * The shape parameter to control the amount of robustness. Must be > 
1.0.
+   * At larger values of M, the huber criterion becomes more similar to 
least squares regression;
+   * for small values of M, the criterion is more similar to L1 regression.
+   * Default is 1.35 to get as much robustness as possible while retaining
+   * 95% statistical efficiency for normally distributed data.
+   */
+  @Since("2.1.0")
+  final val m = new DoubleParam(this, "m", "The shape parameter to control 
the amount of " +
+"robustness. Must be > 1.0.", ParamValidators.gt(1.0))
+
+  /** @group getParam */
+  @Since("2.1.0")
+  def getM: Double = $(m)
+}
+
+/**
+ * Robust regression.
+ *
+ * The learning objective is to minimize the huber loss, with 
regularization.
+ *
+ * The robust regression optimizes the squared loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}}
+ * and the absolute loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}},
+ * where \beta and \sigma are parameters to be optimized.
+ *
+ * This supports two types of regularization: None and L2.
+ *
+ * This estimator is different from the R implementation of Robust 
Regression
+ * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R 
implementation does a
+ * weighted least squares implementation with weights given to each sample 
on the basis
+ * of how much the residual is greater than a certain threshold.
+ */
+@Since("2.1.0")
+class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: 
String)
+  extends Regressor[Vector, RobustRegression, RobustRegressionModel]
+  with RobustRegressionParams with Logging {
+
+  @Since("2.1.0")
--- End diff --

I don't think you need `@Since` at every symbol in the class (that was 
`@Since` itself with the same annotation).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14326: [SPARK-3181] [ML] Implement RobustRegression with...

2016-07-24 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14326#discussion_r71992352
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/RobustRegression.scala ---
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.regression
+
+import scala.collection.mutable
+
+import breeze.linalg.{DenseVector => BDV}
+import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => 
BreezeLBFGS, LBFGSB => BreezeLBFGSB}
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.Since
+import org.apache.spark.internal.Logging
+import org.apache.spark.ml.PredictorParams
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{Vector, Vectors}
+import org.apache.spark.ml.linalg.BLAS._
+import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators}
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Params for robust regression.
+ */
+private[regression] trait RobustRegressionParams extends PredictorParams 
with HasRegParam
+  with HasMaxIter with HasTol with HasFitIntercept with HasStandardization 
with HasWeightCol {
+
+  /**
+   * The shape parameter to control the amount of robustness. Must be > 
1.0.
+   * At larger values of M, the huber criterion becomes more similar to 
least squares regression;
+   * for small values of M, the criterion is more similar to L1 regression.
+   * Default is 1.35 to get as much robustness as possible while retaining
+   * 95% statistical efficiency for normally distributed data.
+   */
+  @Since("2.1.0")
+  final val m = new DoubleParam(this, "m", "The shape parameter to control 
the amount of " +
+"robustness. Must be > 1.0.", ParamValidators.gt(1.0))
+
+  /** @group getParam */
+  @Since("2.1.0")
+  def getM: Double = $(m)
+}
+
+/**
+ * Robust regression.
+ *
+ * The learning objective is to minimize the huber loss, with 
regularization.
+ *
+ * The robust regression optimizes the squared loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\leq M }}}
+ * and the absolute loss for the samples where
+ * {{{ |\frac{(y - X \beta)}{\sigma}|\geq M }}},
+ * where \beta and \sigma are parameters to be optimized.
+ *
+ * This supports two types of regularization: None and L2.
+ *
+ * This estimator is different from the R implementation of Robust 
Regression
+ * ([[http://www.ats.ucla.edu/stat/r/dae/rreg.htm]]) because the R 
implementation does a
+ * weighted least squares implementation with weights given to each sample 
on the basis
+ * of how much the residual is greater than a certain threshold.
+ */
+@Since("2.1.0")
+class RobustRegression @Since("2.1.0") (@Since("2.1.0") override val uid: 
String)
--- End diff --

Are all `@Since` required? I'd think the one on line 82 would be enough.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14327: [SPARK-16686][SQL] Project shouldn't be pushed do...

2016-07-24 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14327#discussion_r71992303
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
---
@@ -422,6 +422,32 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
   3, 17, 27, 58, 62)
   }
 
+  test("SPARK-16686: Dataset.sample with seed results shouldn't depend on 
downstream usage") {
+val udfOne = spark.udf.register("udfOne", (n: Int) => {
+  require(n != 1, "udfOne shouldn't see swid=1!")
+  1
+})
+
+val d = Seq(
+  (0, "string0"),
+  (1, "string1"),
+  (2, "string2"),
+  (3, "string3"),
+  (4, "string4"),
+  (5, "string5"),
+  (6, "string6"),
+  (7, "string7"),
+  (8, "string8"),
+  (9, "string9")
+)
+val df = spark.createDataFrame(d).toDF("swid", "stringData")
+val sampleDF = df.sample(false, 0.7, 50)
+// After sampling, sampleDF doesn't contain swid=1.
+assert(!sampleDF.select("swid").collect.contains(1))
+// udfOne should not encounter swid=1.
+sampleDF.select(udfOne($"swid")).collect
--- End diff --

I assume you're calling `collect` to trigger `assert`, aren't you? If so, 
why don't you return `true`/`false` to denote it and do `assert` here instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14327: [SPARK-16686][SQL] Project shouldn't be pushed do...

2016-07-24 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14327#discussion_r71992270
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
---
@@ -422,6 +422,32 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
   3, 17, 27, 58, 62)
   }
 
+  test("SPARK-16686: Dataset.sample with seed results shouldn't depend on 
downstream usage") {
+val udfOne = spark.udf.register("udfOne", (n: Int) => {
--- End diff --

Is there a reason why you `spark.udf.register` not `udf` directly?

```
val udfOne = udf { n: Int => ... }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14327: [SPARK-16686][SQL] Project shouldn't be pushed do...

2016-07-24 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14327#discussion_r71992254
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
---
@@ -422,6 +422,32 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
   3, 17, 27, 58, 62)
   }
 
+  test("SPARK-16686: Dataset.sample with seed results shouldn't depend on 
downstream usage") {
+val udfOne = spark.udf.register("udfOne", (n: Int) => {
+  require(n != 1, "udfOne shouldn't see swid=1!")
+  1
+})
+
+val d = Seq(
+  (0, "string0"),
+  (1, "string1"),
+  (2, "string2"),
+  (3, "string3"),
+  (4, "string4"),
+  (5, "string5"),
+  (6, "string6"),
+  (7, "string7"),
+  (8, "string8"),
+  (9, "string9")
+)
+val df = spark.createDataFrame(d).toDF("swid", "stringData")
--- End diff --

`d.toDF(...)` should work too, shouldn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14327: [SPARK-16686][SQL] Project shouldn't be pushed do...

2016-07-24 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14327#discussion_r71992242
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -150,13 +150,20 @@ class SimpleTestOptimizer extends Optimizer(
 
 /**
  * Pushes projects down beneath Sample to enable column pruning with 
sampling.
+ * This rule is only doable when the projects don't add new attributes.
  */
 object PushProjectThroughSample extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 // Push down projection into sample
-case Project(projectList, Sample(lb, up, replace, seed, child)) =>
+case p @ Project(projectList, Sample(lb, up, replace, seed, child))
+if !hasNewOutput(projectList, p.child.output) =>
   Sample(lb, up, replace, seed, Project(projectList, child))()
   }
+  private def hasNewOutput(
+  projectList: Seq[NamedExpression],
+  childOutput: Seq[Attribute]): Boolean = {
+projectList.exists(p => !childOutput.exists(_.semanticEquals(p)))
--- End diff --

It's hard to understand what the code does -- two `exists` and negation. 
Can you "untangle" it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14313: [SPARK-16674][SQL] Avoid per-record type dispatch...

2016-07-23 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14313#discussion_r71977368
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 ---
@@ -407,84 +495,8 @@ private[sql] class JDBCRDD(
 var i = 0
 while (i < conversions.length) {
--- End diff --

Why `while` not `foreach` or similar?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14313: [SPARK-16674][SQL] Avoid per-record type dispatch...

2016-07-23 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14313#discussion_r71977344
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 ---
@@ -322,46 +322,134 @@ private[sql] class JDBCRDD(
 }
   }
 
-  // Each JDBC-to-Catalyst conversion corresponds to a tag defined here so 
that
-  // we don't have to potentially poke around in the Metadata once for 
every
-  // row.
-  // Is there a better way to do this?  I'd rather be using a type that
-  // contains only the tags I define.
-  abstract class JDBCConversion
-  case object BooleanConversion extends JDBCConversion
-  case object DateConversion extends JDBCConversion
-  case class  DecimalConversion(precision: Int, scale: Int) extends 
JDBCConversion
-  case object DoubleConversion extends JDBCConversion
-  case object FloatConversion extends JDBCConversion
-  case object IntegerConversion extends JDBCConversion
-  case object LongConversion extends JDBCConversion
-  case object BinaryLongConversion extends JDBCConversion
-  case object StringConversion extends JDBCConversion
-  case object TimestampConversion extends JDBCConversion
-  case object BinaryConversion extends JDBCConversion
-  case class ArrayConversion(elementConversion: JDBCConversion) extends 
JDBCConversion
+  // A `JDBCConversion` is responsible for converting a value from 
`ResultSet`
+  // to a value in a field for `InternalRow`.
+  private type JDBCConversion = (ResultSet, Int) => Any
+
+  // This `ArrayElementConversion` is responsible for converting elements 
in
+  // an array from `ResultSet`.
+  private type ArrayElementConversion = (Object) => Any
 
   /**
-   * Maps a StructType to a type tag list.
+   * Maps a StructType to conversions for each type.
*/
   def getConversions(schema: StructType): Array[JDBCConversion] =
 schema.fields.map(sf => getConversions(sf.dataType, sf.metadata))
 
   private def getConversions(dt: DataType, metadata: Metadata): 
JDBCConversion = dt match {
-case BooleanType => BooleanConversion
-case DateType => DateConversion
-case DecimalType.Fixed(p, s) => DecimalConversion(p, s)
-case DoubleType => DoubleConversion
-case FloatType => FloatConversion
-case IntegerType => IntegerConversion
-case LongType => if (metadata.contains("binarylong")) 
BinaryLongConversion else LongConversion
-case StringType => StringConversion
-case TimestampType => TimestampConversion
-case BinaryType => BinaryConversion
-case ArrayType(et, _) => ArrayConversion(getConversions(et, metadata))
+case BooleanType =>
+  (rs: ResultSet, pos: Int) => rs.getBoolean(pos)
+
+case DateType =>
+  (rs: ResultSet, pos: Int) =>
+// DateTimeUtils.fromJavaDate does not handle null value, so we 
need to check it.
+val dateVal = rs.getDate(pos)
+if (dateVal != null) {
+  DateTimeUtils.fromJavaDate(dateVal)
+} else {
+  null
+}
+
+case DecimalType.Fixed(p, s) =>
+  (rs: ResultSet, pos: Int) =>
+val decimalVal = rs.getBigDecimal(pos)
+if (decimalVal == null) {
+  null
+} else {
+  Decimal(decimalVal, p, s)
+}
+
+case DoubleType =>
+  (rs: ResultSet, pos: Int) => rs.getDouble(pos)
+
+case FloatType =>
+  (rs: ResultSet, pos: Int) => rs.getFloat(pos)
+
+case IntegerType =>
+  (rs: ResultSet, pos: Int) => rs.getInt(pos)
+
+case LongType if metadata.contains("binarylong") =>
+  (rs: ResultSet, pos: Int) =>
+val bytes = rs.getBytes(pos)
+var ans = 0L
+var j = 0
+while (j < bytes.size) {
+  ans = 256 * ans + (255 & bytes(j))
+  j = j + 1
+}
+ans
+
+case LongType =>
+  (rs: ResultSet, pos: Int) => rs.getLong(pos)
+
+case StringType =>
+  (rs: ResultSet, pos: Int) =>
+// TODO(davies): use getBytes for better performance, if the 
encoding is UTF-8
+UTF8String.fromString(rs.getString(pos))
+
+case TimestampType =>
+  (rs: ResultSet, pos: Int) =>
+val t = rs.getTimestamp(pos)
+if (t != null) {
+  DateTimeUtils.fromJavaTimestamp(t)
+} else {
+  null
+}
+
+case BinaryType =>
+  (rs: ResultSet, pos: Int) => rs.getBytes(pos)
+
+case ArrayType(et, _) =&g

[GitHub] spark pull request #14313: [SPARK-16674][SQL] Avoid per-record type dispatch...

2016-07-23 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14313#discussion_r71977337
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 ---
@@ -322,46 +322,134 @@ private[sql] class JDBCRDD(
 }
   }
 
-  // Each JDBC-to-Catalyst conversion corresponds to a tag defined here so 
that
-  // we don't have to potentially poke around in the Metadata once for 
every
-  // row.
-  // Is there a better way to do this?  I'd rather be using a type that
-  // contains only the tags I define.
-  abstract class JDBCConversion
-  case object BooleanConversion extends JDBCConversion
-  case object DateConversion extends JDBCConversion
-  case class  DecimalConversion(precision: Int, scale: Int) extends 
JDBCConversion
-  case object DoubleConversion extends JDBCConversion
-  case object FloatConversion extends JDBCConversion
-  case object IntegerConversion extends JDBCConversion
-  case object LongConversion extends JDBCConversion
-  case object BinaryLongConversion extends JDBCConversion
-  case object StringConversion extends JDBCConversion
-  case object TimestampConversion extends JDBCConversion
-  case object BinaryConversion extends JDBCConversion
-  case class ArrayConversion(elementConversion: JDBCConversion) extends 
JDBCConversion
+  // A `JDBCConversion` is responsible for converting a value from 
`ResultSet`
+  // to a value in a field for `InternalRow`.
+  private type JDBCConversion = (ResultSet, Int) => Any
+
+  // This `ArrayElementConversion` is responsible for converting elements 
in
+  // an array from `ResultSet`.
+  private type ArrayElementConversion = (Object) => Any
 
   /**
-   * Maps a StructType to a type tag list.
+   * Maps a StructType to conversions for each type.
*/
   def getConversions(schema: StructType): Array[JDBCConversion] =
 schema.fields.map(sf => getConversions(sf.dataType, sf.metadata))
 
   private def getConversions(dt: DataType, metadata: Metadata): 
JDBCConversion = dt match {
-case BooleanType => BooleanConversion
-case DateType => DateConversion
-case DecimalType.Fixed(p, s) => DecimalConversion(p, s)
-case DoubleType => DoubleConversion
-case FloatType => FloatConversion
-case IntegerType => IntegerConversion
-case LongType => if (metadata.contains("binarylong")) 
BinaryLongConversion else LongConversion
-case StringType => StringConversion
-case TimestampType => TimestampConversion
-case BinaryType => BinaryConversion
-case ArrayType(et, _) => ArrayConversion(getConversions(et, metadata))
+case BooleanType =>
+  (rs: ResultSet, pos: Int) => rs.getBoolean(pos)
+
+case DateType =>
+  (rs: ResultSet, pos: Int) =>
+// DateTimeUtils.fromJavaDate does not handle null value, so we 
need to check it.
+val dateVal = rs.getDate(pos)
+if (dateVal != null) {
+  DateTimeUtils.fromJavaDate(dateVal)
+} else {
+  null
+}
+
+case DecimalType.Fixed(p, s) =>
+  (rs: ResultSet, pos: Int) =>
+val decimalVal = rs.getBigDecimal(pos)
+if (decimalVal == null) {
+  null
+} else {
+  Decimal(decimalVal, p, s)
+}
+
+case DoubleType =>
+  (rs: ResultSet, pos: Int) => rs.getDouble(pos)
+
+case FloatType =>
+  (rs: ResultSet, pos: Int) => rs.getFloat(pos)
+
+case IntegerType =>
+  (rs: ResultSet, pos: Int) => rs.getInt(pos)
+
+case LongType if metadata.contains("binarylong") =>
+  (rs: ResultSet, pos: Int) =>
+val bytes = rs.getBytes(pos)
+var ans = 0L
+var j = 0
+while (j < bytes.size) {
+  ans = 256 * ans + (255 & bytes(j))
+  j = j + 1
+}
+ans
+
+case LongType =>
+  (rs: ResultSet, pos: Int) => rs.getLong(pos)
+
+case StringType =>
+  (rs: ResultSet, pos: Int) =>
+// TODO(davies): use getBytes for better performance, if the 
encoding is UTF-8
+UTF8String.fromString(rs.getString(pos))
+
+case TimestampType =>
+  (rs: ResultSet, pos: Int) =>
+val t = rs.getTimestamp(pos)
+if (t != null) {
--- End diff --

same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if th

[GitHub] spark pull request #14313: [SPARK-16674][SQL] Avoid per-record type dispatch...

2016-07-23 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14313#discussion_r71977329
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 ---
@@ -322,46 +322,134 @@ private[sql] class JDBCRDD(
 }
   }
 
-  // Each JDBC-to-Catalyst conversion corresponds to a tag defined here so 
that
-  // we don't have to potentially poke around in the Metadata once for 
every
-  // row.
-  // Is there a better way to do this?  I'd rather be using a type that
-  // contains only the tags I define.
-  abstract class JDBCConversion
-  case object BooleanConversion extends JDBCConversion
-  case object DateConversion extends JDBCConversion
-  case class  DecimalConversion(precision: Int, scale: Int) extends 
JDBCConversion
-  case object DoubleConversion extends JDBCConversion
-  case object FloatConversion extends JDBCConversion
-  case object IntegerConversion extends JDBCConversion
-  case object LongConversion extends JDBCConversion
-  case object BinaryLongConversion extends JDBCConversion
-  case object StringConversion extends JDBCConversion
-  case object TimestampConversion extends JDBCConversion
-  case object BinaryConversion extends JDBCConversion
-  case class ArrayConversion(elementConversion: JDBCConversion) extends 
JDBCConversion
+  // A `JDBCConversion` is responsible for converting a value from 
`ResultSet`
+  // to a value in a field for `InternalRow`.
+  private type JDBCConversion = (ResultSet, Int) => Any
+
+  // This `ArrayElementConversion` is responsible for converting elements 
in
+  // an array from `ResultSet`.
+  private type ArrayElementConversion = (Object) => Any
 
   /**
-   * Maps a StructType to a type tag list.
+   * Maps a StructType to conversions for each type.
*/
   def getConversions(schema: StructType): Array[JDBCConversion] =
 schema.fields.map(sf => getConversions(sf.dataType, sf.metadata))
 
   private def getConversions(dt: DataType, metadata: Metadata): 
JDBCConversion = dt match {
-case BooleanType => BooleanConversion
-case DateType => DateConversion
-case DecimalType.Fixed(p, s) => DecimalConversion(p, s)
-case DoubleType => DoubleConversion
-case FloatType => FloatConversion
-case IntegerType => IntegerConversion
-case LongType => if (metadata.contains("binarylong")) 
BinaryLongConversion else LongConversion
-case StringType => StringConversion
-case TimestampType => TimestampConversion
-case BinaryType => BinaryConversion
-case ArrayType(et, _) => ArrayConversion(getConversions(et, metadata))
+case BooleanType =>
+  (rs: ResultSet, pos: Int) => rs.getBoolean(pos)
+
+case DateType =>
+  (rs: ResultSet, pos: Int) =>
+// DateTimeUtils.fromJavaDate does not handle null value, so we 
need to check it.
+val dateVal = rs.getDate(pos)
+if (dateVal != null) {
+  DateTimeUtils.fromJavaDate(dateVal)
+} else {
+  null
+}
+
+case DecimalType.Fixed(p, s) =>
+  (rs: ResultSet, pos: Int) =>
+val decimalVal = rs.getBigDecimal(pos)
+if (decimalVal == null) {
--- End diff --

Same as above (plus you're checking equality with `null` opposite to the 
above -- consistency violated)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14313: [SPARK-16674][SQL] Avoid per-record type dispatch...

2016-07-23 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14313#discussion_r71977310
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 ---
@@ -322,46 +322,134 @@ private[sql] class JDBCRDD(
 }
   }
 
-  // Each JDBC-to-Catalyst conversion corresponds to a tag defined here so 
that
-  // we don't have to potentially poke around in the Metadata once for 
every
-  // row.
-  // Is there a better way to do this?  I'd rather be using a type that
-  // contains only the tags I define.
-  abstract class JDBCConversion
-  case object BooleanConversion extends JDBCConversion
-  case object DateConversion extends JDBCConversion
-  case class  DecimalConversion(precision: Int, scale: Int) extends 
JDBCConversion
-  case object DoubleConversion extends JDBCConversion
-  case object FloatConversion extends JDBCConversion
-  case object IntegerConversion extends JDBCConversion
-  case object LongConversion extends JDBCConversion
-  case object BinaryLongConversion extends JDBCConversion
-  case object StringConversion extends JDBCConversion
-  case object TimestampConversion extends JDBCConversion
-  case object BinaryConversion extends JDBCConversion
-  case class ArrayConversion(elementConversion: JDBCConversion) extends 
JDBCConversion
+  // A `JDBCConversion` is responsible for converting a value from 
`ResultSet`
+  // to a value in a field for `InternalRow`.
+  private type JDBCConversion = (ResultSet, Int) => Any
+
+  // This `ArrayElementConversion` is responsible for converting elements 
in
+  // an array from `ResultSet`.
+  private type ArrayElementConversion = (Object) => Any
 
   /**
-   * Maps a StructType to a type tag list.
+   * Maps a StructType to conversions for each type.
*/
   def getConversions(schema: StructType): Array[JDBCConversion] =
 schema.fields.map(sf => getConversions(sf.dataType, sf.metadata))
 
   private def getConversions(dt: DataType, metadata: Metadata): 
JDBCConversion = dt match {
-case BooleanType => BooleanConversion
-case DateType => DateConversion
-case DecimalType.Fixed(p, s) => DecimalConversion(p, s)
-case DoubleType => DoubleConversion
-case FloatType => FloatConversion
-case IntegerType => IntegerConversion
-case LongType => if (metadata.contains("binarylong")) 
BinaryLongConversion else LongConversion
-case StringType => StringConversion
-case TimestampType => TimestampConversion
-case BinaryType => BinaryConversion
-case ArrayType(et, _) => ArrayConversion(getConversions(et, metadata))
+case BooleanType =>
+  (rs: ResultSet, pos: Int) => rs.getBoolean(pos)
+
+case DateType =>
+  (rs: ResultSet, pos: Int) =>
+// DateTimeUtils.fromJavaDate does not handle null value, so we 
need to check it.
+val dateVal = rs.getDate(pos)
+if (dateVal != null) {
--- End diff --

`Option(dateVal).map(...).orNull`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14295: [SPARK-16648][SQL] Overrides TreeNode.withNewChil...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14295#discussion_r71908714
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala
 ---
@@ -42,6 +42,17 @@ case class Last(child: Expression, ignoreNullsExpr: 
Expression) extends Declarat
 
   override def children: Seq[Expression] = child :: Nil
 
+  // SPARK-16648: Default `TreeNode.withNewChildren` implementation 
doesn't work for `Last` when
--- End diff --

Make it so?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14309: [SPARK-11977][SQL] Support accessing a column con...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14309#discussion_r7196
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
---
@@ -641,6 +641,10 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
 Row(key, value, key + 1)
   }.toSeq)
 assert(df.schema.map(_.name) === Seq("key", "valueRenamed", "newCol"))
+
+// Renaming to a column that contains "." character
+val df2 = testData.toDF().withColumnRenamed("value", "value.Renamed")
--- End diff --

No need for `()` in `toDF()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14309: [SPARK-11977][SQL] Support accessing a column con...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14309#discussion_r7137
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
---
@@ -641,6 +641,10 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
 Row(key, value, key + 1)
   }.toSeq)
 assert(df.schema.map(_.name) === Seq("key", "valueRenamed", "newCol"))
+
+// Renaming to a column that contains "." character
+val df2 = testData.toDF().withColumnRenamed("value", "value.Renamed")
+assert(df2.schema.map(_.name) === Seq("key", "value.Renamed"))
--- End diff --

`df2.schema.fieldNames`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14309: [SPARK-11977][SQL] Support accessing a column con...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14309#discussion_r71888979
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
---
@@ -641,6 +641,10 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
 Row(key, value, key + 1)
   }.toSeq)
 assert(df.schema.map(_.name) === Seq("key", "valueRenamed", "newCol"))
--- End diff --

I'd fix that line, too with `fieldNames`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14307: [SPARK-16672][SQL] SQLBuilder should not raise exception...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on the issue:

https://github.com/apache/spark/pull/14307
  
I didn't even know such a SQL query is possible in SQL :) Lots to learn 
still.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14305: Spark-16669:Adding partition prunning to Metastore stati...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on the issue:

https://github.com/apache/spark/pull/14305
  
I think the PR is completely broken and should be recreated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14302: [SPARK-16663][SQL] desc table should be consisten...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14302#discussion_r71886372
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala
 ---
@@ -41,13 +41,13 @@ class HiveOperatorQueryableSuite extends QueryTest with 
TestHiveSingleton {
 checkAnswer(
   sql("select * from mydesc"),
   Seq(
-Row("key", "int", null),
-Row("value", "string", null)))
+Row("key", "int", ""),
--- End diff --

Wish there were a "better" variant of `Row` as we traded `null`s to `""` :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14302: [SPARK-16663][SQL] desc table should be consisten...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14302#discussion_r71885483
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---
@@ -436,11 +436,13 @@ case class DescribeTableCommand(table: 
TableIdentifier, isExtended: Boolean, isF
 
   private def describePartitionInfo(table: CatalogTable, buffer: 
ArrayBuffer[Row]): Unit = {
 if (DDLUtils.isDatasourceTable(table)) {
-  val partCols = DDLUtils.getPartitionColumnsFromTableProperties(table)
-  if (partCols.nonEmpty) {
+  val schema = DDLUtils.getSchemaFromTableProperties(table)
+  val partColNames = 
DDLUtils.getPartitionColumnsFromTableProperties(table)
+  if (partColNames.nonEmpty && schema.isDefined) {
 append(buffer, "# Partition Information", "", "")
-append(buffer, s"# ${output.head.name}", "", "")
-partCols.foreach(col => append(buffer, col, "", ""))
+append(buffer, s"# ${output.head.name}", output(1).name, 
output(2).name)
+val partCols = partColNames.map(n => schema.get.find(_.name == 
n).get)
--- End diff --

What do you think about this?

```
val s = schema.get
s.fieldNames.intersect(partColNames).map(s.apply)
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14295: [SPARK-16648][SQL] Overrides TreeNode.withNewChil...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14295#discussion_r71871902
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala
 ---
@@ -42,6 +42,17 @@ case class Last(child: Expression, ignoreNullsExpr: 
Expression) extends Declarat
 
   override def children: Seq[Expression] = child :: Nil
 
+  // SPARK-16648: Default `TreeNode.withNewChildren` implementation 
doesn't work for `Last` when
+  // both constructor arguments are the same, e.g.:
+  //
+  //   LAST_VALUE(FALSE) // The 2nd argument defaults to FALSE
+  //   LAST_VALUE(FALSE, FALSE)
+  //   LAST_VALUE(TRUE, TRUE)
+  override def withNewChildren(newChildren: Seq[Expression]): Expression = 
{
+val Seq(newChild) = newChildren
--- End diff --

👍 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14295: [SPARK-16648][SQL] Overrides TreeNode.withNewChil...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14295#discussion_r71871696
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala
 ---
@@ -42,6 +42,17 @@ case class Last(child: Expression, ignoreNullsExpr: 
Expression) extends Declarat
 
   override def children: Seq[Expression] = child :: Nil
 
+  // SPARK-16648: Default `TreeNode.withNewChildren` implementation 
doesn't work for `Last` when
--- End diff --

Use `[[` to reference code symbols.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14292#discussion_r71871490
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala ---
@@ -247,6 +248,46 @@ private[sql] trait SQLTestUtils
   }
 }
   }
+
+  /** Run a test on a separate [[UninterruptibleThread]]. */
+  protected def testWithUninterruptibleThread(name: String, quietly: 
Boolean = false)
+(body: => Unit): Unit = {
+val timeoutMillis = 1
+var ex: Throwable = null
+
+def runOnThread(): Unit = {
+  val thread = new UninterruptibleThread(s"Testing thread for test 
$name") {
+override def run(): Unit = {
+  try {
+body
+  } catch {
+case NonFatal(e) =>
+  ex = e
+  }
+}
+  }
+  thread.setDaemon(true)
+  thread.start()
+  thread.join(timeoutMillis)
+  if (thread.isAlive) {
+thread.interrupt()
+// If this interrupt does not work, then this thread is most 
likely running something that
+// is not interruptible. There is not much point to wait for the 
thread to termniate, and
+// we rather let the JVM terminate the thread on exit.
+fail(
+  s"Test '$name' running on o.a.s.util.UninterruptibleThread timed 
out after" +
+s" $timeoutMillis ms")
+  } else if (ex != null) {
+throw ex
+  }
+}
+
+if (quietly) {
--- End diff --

I'd appreciate your comment on the following alternative:

```
val f = if (quietly) testQuietly else test
f(name) { runOnThread() }
```

?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14292#discussion_r71871337
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala ---
@@ -247,6 +248,46 @@ private[sql] trait SQLTestUtils
   }
 }
   }
+
+  /** Run a test on a separate [[UninterruptibleThread]]. */
+  protected def testWithUninterruptibleThread(name: String, quietly: 
Boolean = false)
+(body: => Unit): Unit = {
+val timeoutMillis = 1
+var ex: Throwable = null
+
+def runOnThread(): Unit = {
+  val thread = new UninterruptibleThread(s"Testing thread for test 
$name") {
+override def run(): Unit = {
+  try {
+body
+  } catch {
+case NonFatal(e) =>
+  ex = e
--- End diff --

Will it work?! You're on another thread here and closing over `ex`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14292#discussion_r71871037
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -269,19 +273,11 @@ class StreamExecution(
* batchId counter is incremented and a new log entry is written with 
the newest offsets.
*/
   private def constructNextBatch(): Unit = {
-// There is a potential dead-lock in Hadoop "Shell.runCommand" before 
2.5.0 (HADOOP-10622).
-// If we interrupt some thread running Shell.runCommand, we may hit 
this issue.
-// As "FileStreamSource.getOffset" will create a file using HDFS API 
and call "Shell.runCommand"
-// to set the file permission, we should not interrupt 
"microBatchThread" when running this
-// method. See SPARK-14131.
-//
 // Check to see what new data is available.
 val hasNewData = {
   awaitBatchLock.lock()
   try {
-val newData = microBatchThread.runUninterruptibly {
-  uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
-}
+val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> 
o))
--- End diff --

Just a single line but takes a while to figure out what it does. I'd 
rewrite it to:

```
uniqueSources.map(s => (s, s.getOffset))...
```

and would do more transformation depending on the types (didn't check in 
IDE) Just an idea to untangle the knots :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14292#discussion_r71870635
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 ---
@@ -91,18 +92,30 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: 
SparkSession, path: String)
 serializer.deserialize[T](ByteBuffer.wrap(bytes))
   }
 
+  /**
+   * Store the metadata for the specified batchId and return `true` if 
successful. If the batchId's
+   * metadata has already been stored, this method will return `false`.
+   *
+   * Note that this method must be called on a 
[[org.apache.spark.util.UninterruptibleThread]]
+   * so that interrupts can be disabled while writing the batch file. This 
is because there is a
+   * potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 
(HADOOP-10622). If the thread
+   * running "Shell.runCommand" is interrupted, then the thread can get 
deadlocked. In our
+   * case, `writeBatch` creates a file using HDFS API and calls 
"Shell.runCommand" to set the
+   * file permissions, and can get deadlocked is the stream execution 
thread is stopped by
+   * interrupt. Hence, we make sure that this method is called on 
UninterruptibleThread which
--- End diff --

`[[org.apache.spark.util.UninterruptibleThread]]` (as you do below)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14292#discussion_r71870509
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 ---
@@ -91,18 +92,30 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: 
SparkSession, path: String)
 serializer.deserialize[T](ByteBuffer.wrap(bytes))
   }
 
+  /**
+   * Store the metadata for the specified batchId and return `true` if 
successful. If the batchId's
+   * metadata has already been stored, this method will return `false`.
+   *
+   * Note that this method must be called on a 
[[org.apache.spark.util.UninterruptibleThread]]
+   * so that interrupts can be disabled while writing the batch file. This 
is because there is a
+   * potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 
(HADOOP-10622). If the thread
+   * running "Shell.runCommand" is interrupted, then the thread can get 
deadlocked. In our
+   * case, `writeBatch` creates a file using HDFS API and calls 
"Shell.runCommand" to set the
+   * file permissions, and can get deadlocked is the stream execution 
thread is stopped by
+   * interrupt. Hence, we make sure that this method is called on 
UninterruptibleThread which
+   * allows use disable interrupts. Also see SPARK-14131.
--- End diff --

"allow use disable interrupts"? Is this ok?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14292: [SPARK-14131][SQL[STREAMING] Improved fix for avo...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14292#discussion_r71870459
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 ---
@@ -91,18 +92,30 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: 
SparkSession, path: String)
 serializer.deserialize[T](ByteBuffer.wrap(bytes))
   }
 
+  /**
+   * Store the metadata for the specified batchId and return `true` if 
successful. If the batchId's
+   * metadata has already been stored, this method will return `false`.
+   *
+   * Note that this method must be called on a 
[[org.apache.spark.util.UninterruptibleThread]]
+   * so that interrupts can be disabled while writing the batch file. This 
is because there is a
+   * potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 
(HADOOP-10622). If the thread
+   * running "Shell.runCommand" is interrupted, then the thread can get 
deadlocked. In our
+   * case, `writeBatch` creates a file using HDFS API and calls 
"Shell.runCommand" to set the
+   * file permissions, and can get deadlocked is the stream execution 
thread is stopped by
--- End diff --

s/is/if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14304: [SPARK-16668][TEST] Test parquet reader for row g...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14304#discussion_r71844762
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
 ---
@@ -78,4 +78,30 @@ class ParquetEncodingSuite extends 
ParquetCompatibilityTest with SharedSQLContex
   }}
 }
   }
+
+  test("Read row group containing both dictionary and plain encoded 
pages") {
+spark.conf.set("parquet.dictionary.page.size", "2048")
+spark.conf.set("parquet.page.size", "4096")
+
+withTempPath { dir =>
+  // In order to explicitly test for SPARK-14217, we set the parquet 
dictionary and page size
+  // such that the following data spans across 3 pages (within a 
single row group) where the
+  // first page is dictionary encoded and the remaining two are plain 
encoded.
+  val data = (0 until 512).flatMap(i => Seq.fill(3)(i.toString))
+  data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath)
+  val file =
+
SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head.asInstanceOf[String]
+
+  val reader = new VectorizedParquetRecordReader
+  reader.initialize(file, null /* set columns to null to project all 
columns */)
+  val column = reader.resultBatch().column(0)
+  assert(reader.nextBatch())
+
+  (0 until 512).foreach { i =>
+assert(column.getUTF8String(3 * i).toString == i.toString)
--- End diff --

What about `toInt` as follows:

```
assert(column.getUTF8String(3 * i).toInt == i)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14304: [SPARK-16668][TEST] Test parquet reader for row g...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14304#discussion_r71844632
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
 ---
@@ -78,4 +78,30 @@ class ParquetEncodingSuite extends 
ParquetCompatibilityTest with SharedSQLContex
   }}
 }
   }
+
+  test("Read row group containing both dictionary and plain encoded 
pages") {
+spark.conf.set("parquet.dictionary.page.size", "2048")
+spark.conf.set("parquet.page.size", "4096")
+
+withTempPath { dir =>
+  // In order to explicitly test for SPARK-14217, we set the parquet 
dictionary and page size
+  // such that the following data spans across 3 pages (within a 
single row group) where the
+  // first page is dictionary encoded and the remaining two are plain 
encoded.
+  val data = (0 until 512).flatMap(i => Seq.fill(3)(i.toString))
+  data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath)
+  val file =
+
SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head.asInstanceOf[String]
+
+  val reader = new VectorizedParquetRecordReader
+  reader.initialize(file, null /* set columns to null to project all 
columns */)
--- End diff --

I meant `initialize(file, columns = null)` or even:

```
val projectAllColumns = null
initialize(file, projectAllColumns)
```

So you code what your intention is (without extra comments).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14315: [HOTFIX][BUILD][SPARK-16287][SQL] Fix annotation argumen...

2016-07-22 Thread jaceklaskowski
Github user jaceklaskowski commented on the issue:

https://github.com/apache/spark/pull/14315
  
I'm on Java 8. I bet you are not. True?

Also, I remembered how to fix it since this kind of error happened in the 
past few times.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14315: [HOTFIX][BUILD][SPARK-16287][SQL] Fix annotation argumen...

2016-07-21 Thread jaceklaskowski
Github user jaceklaskowski commented on the issue:

https://github.com/apache/spark/pull/14315
  
/cc @cloud-fan @techaddict 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14315: [HOTFIX] Fix annotation argument needs to be a co...

2016-07-21 Thread jaceklaskowski
GitHub user jaceklaskowski opened a pull request:

https://github.com/apache/spark/pull/14315

[HOTFIX] Fix annotation argument needs to be a constant

## What changes were proposed in this pull request?

Fix for compilation error:

```

/Users/jacek/dev/oss/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala:402:
 error: annotation argument needs to be a constant; found: "_FUNC_(text[, 
pairDelim, keyValueDelim]) - Creates a map after splitting the text ".+("into 
key/value pairs using delimiters. ").+("Default delimiters are \',\' for 
pairDelim and \':\' for keyValueDelim.")
"into key/value pairs using delimiters. " +
  ^
```

## How was this patch tested?

Local build


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jaceklaskowski/spark 
build-fix-complexTypeCreator

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14315.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14315


commit ee34380a532ca8b5df5f938a0eba51c94a973dff
Author: Jacek Laskowski <ja...@japila.pl>
Date:   2016-07-22T05:24:08Z

[HOTFIX] Fix annotation argument needs to be a constant




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14284: [SPARK-16633] [SPARK-16642] Fixes three issues re...

2016-07-21 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14284#discussion_r71781038
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
 ---
@@ -357,14 +356,59 @@ class SQLWindowFunctionSuite extends QueryTest with 
SQLTestUtils with TestHiveSi
   }
 
   test("SPARK-7595: Window will cause resolve failed with self join") {
-sql("SELECT * FROM src") // Force loading of src table.
-
 checkAnswer(sql(
   """
 |with
-| v1 as (select key, count(value) over (partition by key) cnt_val 
from src),
+| v0 as (select 0 as key, 1 as value),
+| v1 as (select key, count(value) over (partition by key) cnt_val 
from v0),
 | v2 as (select v1.key, v1_lag.cnt_val from v1, v1 v1_lag where 
v1.key = v1_lag.key)
-| select * from v2 order by key limit 1
-  """.stripMargin), Row(0, 3))
+| select key, cnt_val from v2 order by key limit 1
+  """.stripMargin), Row(0, 1))
+  }
+
+  test("lead/lag should return the default value if the offset row does 
not exist") {
+checkAnswer(sql(
+  """
+|SELECT
+|  lag(123, 100, 321) OVER (ORDER BY id) as lag,
+|  lead(123, 100, 321) OVER (ORDER BY id) as lead
+|FROM (SELECT 1 as id) tmp
+  """.stripMargin),
+  Row(321, 321))
+
+checkAnswer(sql(
+  """
+|SELECT
+|  lag(123, 100, a) OVER (ORDER BY id) as lag,
+|  lead(123, 100, a) OVER (ORDER BY id) as lead
+|FROM (SELECT 1 as id, 2 as a) tmp
+  """.stripMargin),
+  Row(2, 2))
+  }
+
+  test("lead/lag should be able to handle null input value correctly") {
--- End diff --

I don't think "correctly" is needed here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14304: [SPARK-16668][TEST] Test parquet reader for row g...

2016-07-21 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14304#discussion_r71780859
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
 ---
@@ -78,4 +78,29 @@ class ParquetEncodingSuite extends 
ParquetCompatibilityTest with SharedSQLContex
   }}
 }
   }
+
+  test("Read row group containing both dictionary and plain encoded 
pages") {
+spark.conf.set("parquet.dictionary.page.size", "2048")
+spark.conf.set("parquet.page.size", "4096")
+
+withTempPath { dir =>
+  // In order to explicitly test for SPARK-14217, we set the parquet 
dictionary and page size
+  // such that the following data spans across 3 pages (within a 
single row group) where the
+  // first page is dictionary encoded and the remaining two are plain 
encoded.
+  val data = (0 until 512).flatMap(i => List(i.toString, i.toString, 
i.toString))
--- End diff --

What do you think about `Seq.fill(3)(i.toString)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14285: [SPARK-16649][SQL] Push partition predicates down...

2016-07-21 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14285#discussion_r71776877
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 ---
@@ -632,6 +632,23 @@ class SessionCatalog(
   }
 
   /**
+   * Returns partitions filtered by predicates for the given table, It 
just work for Hive.
--- End diff --

Why do you copy the scaladoc?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14285: [SPARK-16649][SQL] Push partition predicates down...

2016-07-21 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14285#discussion_r71776725
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 ---
@@ -632,6 +632,23 @@ class SessionCatalog(
   }
 
   /**
+   * Returns partitions filtered by predicates for the given table, It 
just work for Hive.
--- End diff --

Same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14285: [SPARK-16649][SQL] Push partition predicates down...

2016-07-21 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14285#discussion_r71776630
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
 ---
@@ -167,6 +168,21 @@ abstract class ExternalCatalog {
   table: String,
   partialSpec: Option[TablePartitionSpec] = None): 
Seq[CatalogTablePartition]
 
+  /**
+   * Returns partitions filtered by predicates for the given table, It 
just work for Hive.
+   *
+   * The filters Expressions may optionally be provided to filter the 
partitions returned.
+   * For instance, if there exist partitions (a='1', b='2'), (a='1', 
b='3') and (a='2', b='4'),
+   * then the filters (a='1') will return the first two only.
+   * @param db database name
--- End diff --

New line before `@param`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14285: [SPARK-16649][SQL] Push partition predicates down...

2016-07-21 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14285#discussion_r71776459
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
 ---
@@ -167,6 +168,21 @@ abstract class ExternalCatalog {
   table: String,
   partialSpec: Option[TablePartitionSpec] = None): 
Seq[CatalogTablePartition]
 
+  /**
+   * Returns partitions filtered by predicates for the given table, It 
just work for Hive.
--- End diff --

". It just works"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14304: [SPARK-16668][TEST] Test parquet reader for row g...

2016-07-21 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14304#discussion_r71774436
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
 ---
@@ -78,4 +78,29 @@ class ParquetEncodingSuite extends 
ParquetCompatibilityTest with SharedSQLContex
   }}
 }
   }
+
+  test("Read row group containing both dictionary and plain encoded 
pages") {
+spark.conf.set("parquet.dictionary.page.size", "2048")
+spark.conf.set("parquet.page.size", "4096")
+
+withTempPath { dir =>
+  // In order to explicitly test for SPARK-14217, we set the parquet 
dictionary and page size
+  // such that the following data spans across 3 pages (within a 
single row group) where the
+  // first page is dictionary encoded and the remaining two are plain 
encoded.
+  val data = (0 until 512).flatMap(i => List(i.toString, i.toString, 
i.toString))
+  data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath)
+  val file = 
SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
+
+  val reader = new VectorizedParquetRecordReader
+  reader.initialize(file.asInstanceOf[String], null)
+  val batch = reader.resultBatch()
+  assert(reader.nextBatch())
+
+  (0 until 512).foreach { i =>
+assert(batch.column(0).getUTF8String(3 * i).toString == i.toString)
--- End diff --

Two things here:

1. Create column in line 96 (`batch.column(0)`).
2. Since you convert `toString`, what do you think about `toInt` instead 
(since `i` is `Int` anyway). One conversion less :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14304: [SPARK-16668][TEST] Test parquet reader for row g...

2016-07-21 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14304#discussion_r71773933
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
 ---
@@ -78,4 +78,29 @@ class ParquetEncodingSuite extends 
ParquetCompatibilityTest with SharedSQLContex
   }}
 }
   }
+
+  test("Read row group containing both dictionary and plain encoded 
pages") {
+spark.conf.set("parquet.dictionary.page.size", "2048")
+spark.conf.set("parquet.page.size", "4096")
+
+withTempPath { dir =>
+  // In order to explicitly test for SPARK-14217, we set the parquet 
dictionary and page size
+  // such that the following data spans across 3 pages (within a 
single row group) where the
+  // first page is dictionary encoded and the remaining two are plain 
encoded.
+  val data = (0 until 512).flatMap(i => List(i.toString, i.toString, 
i.toString))
+  data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath)
+  val file = 
SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
+
+  val reader = new VectorizedParquetRecordReader
+  reader.initialize(file.asInstanceOf[String], null)
--- End diff --

What do you think about moving this `asInstanceOf` to line 92 and using a 
name parameter for `null`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14207: [SPARK-16552] [SQL] [WIP] Store the Inferred Sche...

2016-07-17 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14207#discussion_r71083334
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala ---
@@ -351,6 +353,44 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   }
 
   /**
+   * Refresh the inferred schema stored in the external catalog for data 
source tables.
+   */
+  private def refreshInferredSchema(tableIdent: TableIdentifier): Unit = {
+val table = sessionCatalog.getTableMetadataOption(tableIdent)
+table.foreach { tableDesc =>
+  if (DDLUtils.isDatasourceTable(tableDesc) && 
DDLUtils.isSchemaInferred(tableDesc)) {
+val partitionColumns = 
DDLUtils.getPartitionColumnsFromTableProperties(tableDesc)
+val bucketSpec = 
DDLUtils.getBucketSpecFromTableProperties(tableDesc)
+val dataSource =
+  DataSource(
+sparkSession,
+userSpecifiedSchema = None,
+partitionColumns = partitionColumns,
+bucketSpec = bucketSpec,
+className = 
tableDesc.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER),
+options = tableDesc.storage.serdeProperties)
+.resolveRelation().asInstanceOf[HadoopFsRelation]
+
+val schemaProperties = new mutable.HashMap[String, String]
+CreateDataSourceTableUtils.saveSchema(
+  sparkSession, dataSource.schema, 
dataSource.partitionSchema.fieldNames, schemaProperties)
+
+val tablePropertiesWithoutSchema = tableDesc.properties.filterKeys 
{ k =>
+  // Keep the properties that are not for schema or partition 
columns
+  k != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS &&
--- End diff --

It's hard to know what the code's doing inside `filterKeys` -- consider 
creating a predicate function with a proper name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14122: [SPARK-16470][ML][Optimizer] Check linear regress...

2016-07-17 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14122#discussion_r71083220
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -327,6 +327,11 @@ class LinearRegression @Since("1.3.0") 
(@Since("1.3.0") override val uid: String
 throw new SparkException(msg)
   }
 
+  if (!state.actuallyConverged) {
+logWarning("LinearRegression training fininshed but the result " +
--- End diff --

@srowen There's a typo in `fininshed` :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14207: [SPARK-16552] [SQL] [WIP] Store the Inferred Sche...

2016-07-17 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14207#discussion_r71083304
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -487,6 +487,10 @@ object DDLUtils {
 isDatasourceTable(table.properties)
   }
 
+  def isSchemaInferred(table: CatalogTable): Boolean = {
+table.properties.get(DATASOURCE_SCHEMA_TYPE) == 
Option(SchemaType.INFERRED.name)
--- End diff --

Consider `contains`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14210: [SPARK-16556] [SPARK-16559] [SQL] Fix Two Bugs in...

2016-07-16 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14210#discussion_r71072892
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
---
@@ -1264,6 +1265,29 @@ class DDLSuite extends QueryTest with 
SharedSQLContext with BeforeAndAfterEach {
 }
   }
 
+  test("create table using cluster by without schema specification") {
--- End diff --

s/cluster/clustered? Even an uppercase variant CLUSTERED BY as in the other 
descriptions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14210: [SPARK-16556] [SPARK-16559] [SQL] Fix Two Bugs in...

2016-07-16 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14210#discussion_r71072862
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
 ---
@@ -199,7 +200,7 @@ class CreateTableAsSelectSuite extends DataSourceTest 
with SharedSQLContext with
 }
   }
 
-  test("create table using as select - with bucket") {
+  test("create table using as select - with non-zero bucket") {
--- End diff --

s/bucket/buckets?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14210: [SPARK-16556] [SPARK-16559] [SQL] Fix Two Bugs in...

2016-07-16 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14210#discussion_r71072854
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
 ---
@@ -212,7 +213,23 @@ class CreateTableAsSelectSuite extends DataSourceTest 
with SharedSQLContext with
   )
   val table = catalog.getTableMetadata(TableIdentifier("t"))
   assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
-Some(BucketSpec(5, Seq("a"), Seq("b"
+Option(BucketSpec(5, Seq("a"), Seq("b"
+}
+  }
+
+  test("create table using as select - with zero bucket") {
--- End diff --

s/bucket/buckets?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14210: [SPARK-16556] [SPARK-16559] [SQL] Fix Two Bugs in...

2016-07-16 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14210#discussion_r71072842
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
---
@@ -1264,6 +1265,29 @@ class DDLSuite extends QueryTest with 
SharedSQLContext with BeforeAndAfterEach {
 }
   }
 
+  test("create table using cluster by without schema specification") {
+import testImplicits._
+withTempPath { tempDir =>
+  withTable("jsonTable") {
+(("a", "b") :: 
Nil).toDF().toJSON.rdd.saveAsTextFile(tempDir.getCanonicalPath)
--- End diff --

Why don't you use `toDF.write.json` instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14217: [SPARK-16562][SQL] Do not allow downcast in INT32...

2016-07-16 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14217#discussion_r71072773
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 ---
@@ -169,6 +169,19 @@ class ParquetIOSuite extends QueryTest with 
ParquetTest with SharedSQLContext {
 }
   }
 
+  test("SPARK-16562 Do not allow downcast in INT32 based types for normal 
Parquet reader") {
+withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+  withTempPath { file =>
+(1 to 
4).map(Tuple1(_)).toDF("a").write.parquet(file.getAbsolutePath)
--- End diff --

Why do you `map(Tuple1(_))`? Why don't `(1 to 4).toDF("a")` instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13526: [SPARK-15780][SQL] Support mapValues on KeyValueG...

2016-07-07 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/13526#discussion_r69986596
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
---
@@ -312,6 +312,17 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
   "a", "30", "b", "3", "c", "1")
   }
 
+  test("groupBy function, mapValues, flatMap") {
+val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
--- End diff --

Just `.toDS`? (no brackets)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13526: [SPARK-15780][SQL] Support mapValues on KeyValueG...

2016-07-07 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/13526#discussion_r69986532
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -65,6 +65,46 @@ class KeyValueGroupedDataset[K, V] private[sql](
   groupingAttributes)
 
   /**
+   * Returns a new [[KeyValueGroupedDataset]] where the given function has 
been applied to the
+   * data. The grouping key is unchanged by this.
+   *
+   * {{{
+   *   // Create values grouped by key from a Dataset[(K, V)]
+   *   ds.groupByKey(_._1).mapValues(_._2) // Scala
+   * }}}
+   * @since 2.0.0
+   */
+  def mapValues[W: Encoder](func: V => W): KeyValueGroupedDataset[K, W] = {
+val withNewData = AppendColumns(func, dataAttributes, logicalPlan)
+val projected = Project(withNewData.newColumns ++ groupingAttributes, 
withNewData)
+val executed = sparkSession.sessionState.executePlan(projected)
+
+new KeyValueGroupedDataset(
+  encoderFor[K],
+  encoderFor[W],
+  executed,
+  withNewData.newColumns,
+  groupingAttributes)
+  }
+
+  /**
+   * Returns a new [[KeyValueGroupedDataset]] where the given function has 
been applied to the
+   * data. The grouping key is unchanged by this.
+   *
+   * {{{
+   *   // Create Integer values grouped by String key from a 
Dataset<Tuple2<String, Integer>>
+   *   Dataset<Tuple2<String, Integer>> ds = ...;
+   *   KeyValueGroupedDataset<String, Integer> grouped =
+   * ds.groupByKey(t -> t._1, Encoders.STRING()).mapValues(t -> t._2, 
Encoders.INT()); // Java 8
+   * }}}
+   * @since 2.0.0
+   */
+  def mapValues[W](func: MapFunction[V, W], encoder: Encoder[W]): 
KeyValueGroupedDataset[K, W] = {
+implicit val uEnc = encoder
+mapValues{ (v: V) => func.call(v) }
--- End diff --

A space before `{`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13526: [SPARK-15780][SQL] Support mapValues on KeyValueG...

2016-07-07 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/13526#discussion_r69986479
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -65,6 +65,46 @@ class KeyValueGroupedDataset[K, V] private[sql](
   groupingAttributes)
 
   /**
+   * Returns a new [[KeyValueGroupedDataset]] where the given function has 
been applied to the
+   * data. The grouping key is unchanged by this.
+   *
+   * {{{
+   *   // Create values grouped by key from a Dataset[(K, V)]
+   *   ds.groupByKey(_._1).mapValues(_._2) // Scala
+   * }}}
+   * @since 2.0.0
+   */
+  def mapValues[W: Encoder](func: V => W): KeyValueGroupedDataset[K, W] = {
+val withNewData = AppendColumns(func, dataAttributes, logicalPlan)
+val projected = Project(withNewData.newColumns ++ groupingAttributes, 
withNewData)
+val executed = sparkSession.sessionState.executePlan(projected)
+
+new KeyValueGroupedDataset(
+  encoderFor[K],
+  encoderFor[W],
+  executed,
+  withNewData.newColumns,
+  groupingAttributes)
+  }
+
+  /**
+   * Returns a new [[KeyValueGroupedDataset]] where the given function has 
been applied to the
+   * data. The grouping key is unchanged by this.
+   *
+   * {{{
+   *   // Create Integer values grouped by String key from a 
Dataset<Tuple2<String, Integer>>
+   *   Dataset<Tuple2<String, Integer>> ds = ...;
+   *   KeyValueGroupedDataset<String, Integer> grouped =
+   * ds.groupByKey(t -> t._1, Encoders.STRING()).mapValues(t -> t._2, 
Encoders.INT()); // Java 8
+   * }}}
+   * @since 2.0.0
--- End diff --

A new line before `@since`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13526: [SPARK-15780][SQL] Support mapValues on KeyValueG...

2016-07-07 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/13526#discussion_r69986420
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -65,6 +65,46 @@ class KeyValueGroupedDataset[K, V] private[sql](
   groupingAttributes)
 
   /**
+   * Returns a new [[KeyValueGroupedDataset]] where the given function has 
been applied to the
+   * data. The grouping key is unchanged by this.
+   *
+   * {{{
+   *   // Create values grouped by key from a Dataset[(K, V)]
+   *   ds.groupByKey(_._1).mapValues(_._2) // Scala
+   * }}}
+   * @since 2.0.0
+   */
+  def mapValues[W: Encoder](func: V => W): KeyValueGroupedDataset[K, W] = {
+val withNewData = AppendColumns(func, dataAttributes, logicalPlan)
+val projected = Project(withNewData.newColumns ++ groupingAttributes, 
withNewData)
+val executed = sparkSession.sessionState.executePlan(projected)
+
+new KeyValueGroupedDataset(
+  encoderFor[K],
+  encoderFor[W],
+  executed,
+  withNewData.newColumns,
+  groupingAttributes)
+  }
+
+  /**
+   * Returns a new [[KeyValueGroupedDataset]] where the given function has 
been applied to the
--- End diff --

...with the given function `func` applied to the data?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13526: [SPARK-15780][SQL] Support mapValues on KeyValueG...

2016-07-07 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/13526#discussion_r69986245
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -65,6 +65,46 @@ class KeyValueGroupedDataset[K, V] private[sql](
   groupingAttributes)
 
   /**
+   * Returns a new [[KeyValueGroupedDataset]] where the given function has 
been applied to the
+   * data. The grouping key is unchanged by this.
+   *
+   * {{{
+   *   // Create values grouped by key from a Dataset[(K, V)]
+   *   ds.groupByKey(_._1).mapValues(_._2) // Scala
+   * }}}
+   * @since 2.0.0
+   */
+  def mapValues[W: Encoder](func: V => W): KeyValueGroupedDataset[K, W] = {
--- End diff --

...while here `W: Encoder` only after `:`. Why is this inconsistency?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13526: [SPARK-15780][SQL] Support mapValues on KeyValueG...

2016-07-07 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/13526#discussion_r69986179
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -175,6 +175,17 @@ object AppendColumns {
   encoderFor[U].namedExpressions,
   child)
   }
+
+  def apply[T : Encoder, U : Encoder](
--- End diff --

Here you use `T : Encoder`, i.e. with spaces before and after `:` while...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14087: [SPARK-16411][SQL][STREAMING] Add textFile to Str...

2016-07-07 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14087#discussion_r69985379
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -331,6 +331,24 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read from textfile") {
+withTempDirs { case (src, tmp) =>
+  val textStream = spark.readStream.textFile(src.getCanonicalPath)
+  val filtered = textStream.filter($"value" contains "keep")
+
+  testStream(filtered)(
+AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
+CheckAnswer("keep2", "keep3"),
+StopStream,
+AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
+StartStream(),
--- End diff --

Just wondering why `()` are here while not for `StopStream`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14087: [SPARK-16411][SQL][STREAMING] Add textFile to Str...

2016-07-07 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14087#discussion_r69985100
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
---
@@ -281,6 +281,31 @@ final class DataStreamReader 
private[sql](sparkSession: SparkSession) extends Lo
   @Experimental
   def text(path: String): DataFrame = format("text").load(path)
 
+  /**
+   * Loads text files and returns a [[Dataset]] of String. The underlying 
schema of the Dataset
+   * contains a single string column named "value".
+   *
+   * If the directory structure of the text files contains partitioning 
information, those are
+   * ignored in the resulting Dataset. To include partitioning information 
as columns, use `text`.
+   *
+   * Each line in the text files is a new element in the resulting 
Dataset. For example:
+   * {{{
+   *   // Scala:
+   *   spark.read.textFile("/path/to/spark/README.md")
+   *
+   *   // Java:
+   *   spark.read().textFile("/path/to/spark/README.md")
+   * }}}
+   *
+   * @param path input path
+   * @since 2.0.0
+   */
+  def textFile(path: String): Dataset[String] = {
+if (userSpecifiedSchema.nonEmpty) {
+  throw new AnalysisException("User specified schema not supported 
with `textFile`")
+}
+
text(path).select("value").as[String](sparkSession.implicits.newStringEncoder)
--- End diff --

I'm surprised that `sparkSession.implicits.newStringEncoder` is required 
here? Why is `sparkSession.implicits._` not imported here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14087: [SPARK-16411][SQL][STREAMING] Add textFile to Str...

2016-07-07 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14087#discussion_r69985212
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
---
@@ -281,6 +281,31 @@ final class DataStreamReader 
private[sql](sparkSession: SparkSession) extends Lo
   @Experimental
   def text(path: String): DataFrame = format("text").load(path)
 
+  /**
+   * Loads text files and returns a [[Dataset]] of String. The underlying 
schema of the Dataset
+   * contains a single string column named "value".
+   *
+   * If the directory structure of the text files contains partitioning 
information, those are
+   * ignored in the resulting Dataset. To include partitioning information 
as columns, use `text`.
+   *
+   * Each line in the text files is a new element in the resulting 
Dataset. For example:
+   * {{{
+   *   // Scala:
+   *   spark.read.textFile("/path/to/spark/README.md")
+   *
+   *   // Java:
+   *   spark.read().textFile("/path/to/spark/README.md")
--- End diff --

s/read/readStream?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14087: [SPARK-16411][SQL][STREAMING] Add textFile to Str...

2016-07-07 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14087#discussion_r69985195
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
---
@@ -281,6 +281,31 @@ final class DataStreamReader 
private[sql](sparkSession: SparkSession) extends Lo
   @Experimental
   def text(path: String): DataFrame = format("text").load(path)
 
+  /**
+   * Loads text files and returns a [[Dataset]] of String. The underlying 
schema of the Dataset
+   * contains a single string column named "value".
+   *
+   * If the directory structure of the text files contains partitioning 
information, those are
+   * ignored in the resulting Dataset. To include partitioning information 
as columns, use `text`.
+   *
+   * Each line in the text files is a new element in the resulting 
Dataset. For example:
+   * {{{
+   *   // Scala:
+   *   spark.read.textFile("/path/to/spark/README.md")
--- End diff --

s/read/readStream?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14087: [SPARK-16411][SQL][STREAMING] Add textFile to Str...

2016-07-07 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14087#discussion_r69984805
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
---
@@ -281,6 +281,31 @@ final class DataStreamReader 
private[sql](sparkSession: SparkSession) extends Lo
   @Experimental
   def text(path: String): DataFrame = format("text").load(path)
 
+  /**
+   * Loads text files and returns a [[Dataset]] of String. The underlying 
schema of the Dataset
+   * contains a single string column named "value".
+   *
+   * If the directory structure of the text files contains partitioning 
information, those are
+   * ignored in the resulting Dataset. To include partitioning information 
as columns, use `text`.
+   *
+   * Each line in the text files is a new element in the resulting 
Dataset. For example:
+   * {{{
+   *   // Scala:
+   *   spark.read.textFile("/path/to/spark/README.md")
+   *
+   *   // Java:
+   *   spark.read().textFile("/path/to/spark/README.md")
+   * }}}
+   *
+   * @param path input path
+   * @since 2.0.0
+   */
+  def textFile(path: String): Dataset[String] = {
+if (userSpecifiedSchema.nonEmpty) {
+  throw new AnalysisException("User specified schema not supported 
with `textFile`")
--- End diff --

user-specified


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14087: [SPARK-16411][SQL][STREAMING] Add textFile to Str...

2016-07-07 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14087#discussion_r69984678
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
---
@@ -281,6 +281,31 @@ final class DataStreamReader 
private[sql](sparkSession: SparkSession) extends Lo
   @Experimental
   def text(path: String): DataFrame = format("text").load(path)
 
+  /**
+   * Loads text files and returns a [[Dataset]] of String. The underlying 
schema of the Dataset
+   * contains a single string column named "value".
+   *
+   * If the directory structure of the text files contains partitioning 
information, those are
+   * ignored in the resulting Dataset. To include partitioning information 
as columns, use `text`.
+   *
+   * Each line in the text files is a new element in the resulting 
Dataset. For example:
--- End diff --

s/element/record?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14087: [SPARK-16411][SQL][STREAMING] Add textFile to Str...

2016-07-07 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14087#discussion_r69984584
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
---
@@ -281,6 +281,31 @@ final class DataStreamReader 
private[sql](sparkSession: SparkSession) extends Lo
   @Experimental
   def text(path: String): DataFrame = format("text").load(path)
 
+  /**
+   * Loads text files and returns a [[Dataset]] of String. The underlying 
schema of the Dataset
--- End diff --

a text file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14083: [SPARK-16406][SQL] Improve performance of Logical...

2016-07-07 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14083#discussion_r69982767
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 ---
@@ -165,111 +169,99 @@ abstract class LogicalPlan extends 
QueryPlan[LogicalPlan] with Logging {
   def resolveQuoted(
   name: String,
   resolver: Resolver): Option[NamedExpression] = {
-resolve(UnresolvedAttribute.parseAttributeName(name), output, resolver)
+
outputAttributeResolver.resolve(UnresolvedAttribute.parseAttributeName(name), 
resolver)
   }
 
   /**
-   * Resolve the given `name` string against the given attribute, 
returning either 0 or 1 match.
-   *
-   * This assumes `name` has multiple parts, where the 1st part is a 
qualifier
-   * (i.e. table name, alias, or subquery alias).
-   * See the comment above `candidates` variable in resolve() for 
semantics the returned data.
+   * Refreshes (or invalidates) any metadata/data cached in the plan 
recursively.
*/
-  private def resolveAsTableColumn(
-  nameParts: Seq[String],
-  resolver: Resolver,
-  attribute: Attribute): Option[(Attribute, List[String])] = {
-assert(nameParts.length > 1)
-if (attribute.qualifier.exists(resolver(_, nameParts.head))) {
-  // At least one qualifier matches. See if remaining parts match.
-  val remainingParts = nameParts.tail
-  resolveAsColumn(remainingParts, resolver, attribute)
-} else {
-  None
-}
+  def refresh(): Unit = children.foreach(_.refresh())
+}
+
+/**
+ * Helper class for (LogicalPlan) attribute resolution. This class indexes 
attributes by their
+ * case-in-sensitive name, and checks potential candidates using the given 
Resolver. Both qualified
--- End diff --

case-insensitive? When you say "the given Resolver", what do you mean by 
"Resolver"? Can we link to the type?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14080: [SPARK-16405] Add metrics and source for external...

2016-07-07 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14080#discussion_r69981791
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
 ---
@@ -143,4 +179,26 @@ private void checkAuth(TransportClient client, String 
appId) {
 }
   }
 
+  /**
+   * A simple class to wrap all shuffle service wrapper metrics
+   */
+  private class ShuffleMetrics implements MetricSet {
+private final Map<String, Metric> allMetrics;
+private final Timer timeDelayForOpenBlockRequest = new Timer();
+private final Timer timeDelayForRegisterExecutorRequest = new Timer();
+private final Meter transferBlockRate = new Meter();
+
+private ShuffleMetrics() {
+  allMetrics = new HashMap<>();
--- End diff --

Will it work with Java 7? I think Spark 2.0 will keep support for the 
version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13981: [SPARK-16307] [ML] Add test to verify the predict...

2016-07-06 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/13981#discussion_r69703134
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala
 ---
@@ -96,6 +97,25 @@ class DecisionTreeRegressorSuite
   assert(variance === expectedVariance,
 s"Expected variance $expectedVariance but got $variance.")
 }
+
+val varianceData: RDD[LabeledPoint] = TreeTests.varianceData(sc)
+val varianceDF = TreeTests.setMetadata(varianceData, Map.empty[Int, 
Int], 0)
+dt.setMaxDepth(1)
+  .setMaxBins(6)
+  .setSeed(0)
+val transformVarDF = dt.fit(varianceDF).transform(varianceDF)
+val calculatedVariances = 
transformVarDF.select(dt.getVarianceCol).collect().map {
--- End diff --

Before `collect` so you work with type-safe `Dataset` (not a `DataFrame` 
which is `Dataset[Row]`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13981: [SPARK-16307] [ML] Add test to verify the predict...

2016-07-05 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/13981#discussion_r69631781
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala
 ---
@@ -96,6 +97,25 @@ class DecisionTreeRegressorSuite
   assert(variance === expectedVariance,
 s"Expected variance $expectedVariance but got $variance.")
 }
+
+val varianceData: RDD[LabeledPoint] = TreeTests.varianceData(sc)
+val varianceDF = TreeTests.setMetadata(varianceData, Map.empty[Int, 
Int], 0)
+dt.setMaxDepth(1)
+  .setMaxBins(6)
+  .setSeed(0)
+val transformVarDF = dt.fit(varianceDF).transform(varianceDF)
+val calculatedVariances = 
transformVarDF.select(dt.getVarianceCol).collect().map {
--- End diff --

I wonder if `toDS` worked here and you'd have `variance` "simpler".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14034: [SPARK-16355] [SPARK-16354] [SQL] Fix Bugs When L...

2016-07-05 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14034#discussion_r69528969
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala ---
@@ -31,4 +33,46 @@ class StatisticsSuite extends QueryTest with 
SharedSQLContext {
   spark.sessionState.conf.autoBroadcastJoinThreshold)
   }
 
+  test("estimates the size of limit") {
+withTempTable("test") {
+  Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v")
+.createOrReplaceTempView("test")
+  Seq((0, 1), (1, 24), (2, 48)).foreach { case (limit, expected) =>
+val df = sql(s"""SELECT * FROM test limit $limit""")
+
+val sizesGlobalLimit = df.queryExecution.analyzed.collect { case 
g: GlobalLimit =>
+  g.statistics.sizeInBytes
+}
+assert(sizesGlobalLimit.size === 1, s"Size wrong for:\n 
${df.queryExecution}")
+assert(sizesGlobalLimit(0).equals(BigInt(expected)),
+  s"expected exact size 24 for table 'test', got: 
${sizesGlobalLimit(0)}")
+
+val sizesLocalLimit = df.queryExecution.analyzed.collect { case l: 
LocalLimit =>
+  l.statistics.sizeInBytes
+}
+assert(sizesLocalLimit.size === 1, s"Size wrong for:\n 
${df.queryExecution}")
+assert(sizesLocalLimit(0).equals(BigInt(expected)),
+  s"expected exact size 24 for table 'test', got: 
${sizesLocalLimit(0)}")
+  }
+}
+  }
+
+  test("estimates the size of a limit 0 on outer join") {
+withTempTable("test") {
+  Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v")
+.createOrReplaceTempView("test")
+  val df1 = spark.table("test")
+  val df2 = spark.table("test").limit(0)
+  val df = df1.join(df2, Seq("k"), "left")
+
+  val sizes = df.queryExecution.analyzed.collect { case g: Join =>
+g.statistics.sizeInBytes
+  }
+
+  assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}")
+  assert(sizes(0).equals(BigInt(96)),
--- End diff --

Why do you `equals`? Would `===` not work here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14047: [SPARK-16368] [SQL] Fix Strange Errors When Creat...

2016-07-05 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14047#discussion_r69527187
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -391,6 +391,29 @@ class HiveDDLSuite
 }
   }
 
+  test("create view with mismatched schema") {
--- End diff --

Yes. You're right. I must've overlooked this no-so-little difference. Is 
`temporary` the only difference between the tests? Could we have a template 
test method to generate proper versions for the case? (even if it were too much 
for this change, it'd pave the way for more tests like this in the future). We 
could also have a follow-up change after this one is committed. WDYT @srowen 
@rxin? Is this worth the effort? I'm concerned with this duplication.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14034: [SPARK-16355] [SPARK-16354] [SQL] Fix Bugs When L...

2016-07-04 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14034#discussion_r69488260
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 ---
@@ -251,6 +251,22 @@ trait CheckAnalysis extends PredicateHelper {
 s"but one table has '${firstError.output.length}' columns 
and another table has " +
 s"'${s.children.head.output.length}' columns")
 
+  case l: GlobalLimit =>
+val numRows = l.limitExpr.eval().asInstanceOf[Int]
+if (numRows < 0) {
+  failAnalysis(
+s"number_rows in limit clause must be equal to or greater 
than 0. " +
+  s"number_rows:$numRows")
+}
+
+  case l: LocalLimit =>
--- End diff --

What do you think about merging the two cases to `case l @ (_: LocalLimit | 
_: GlobalLimit) =>` to remove the duplication (or at least introduce a local 
method).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14047: [SPARK-16368] [SQL] Fix Strange Errors When Creat...

2016-07-04 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14047#discussion_r69487586
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -391,6 +391,29 @@ class HiveDDLSuite
 }
   }
 
+  test("create view with mismatched schema") {
--- End diff --

Did you copy the tests? Could you file a JIRA issue to get rid of the 
duplication at the very least? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...

2016-07-04 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14035#discussion_r69432798
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
 ---
@@ -55,7 +56,7 @@ class LogisticRegressionSuite
 generateMultinomialLogisticInput(coefficients, xMean, xVariance,
   addIntercept = true, nPoints, 42)
 
-  spark.createDataFrame(sc.parallelize(testData, 4))
+  sc.parallelize(testData, 4).toDF()
--- End diff --

It'd be nice to know what was the purpose of the explicit partition setting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...

2016-07-03 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14035#discussion_r69391176
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala ---
@@ -282,9 +281,7 @@ class MLUtilsSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 val z = Vectors.dense(4.0).asML
 val p = (5.0, z)
 val w = Vectors.dense(6.0)
-val df = spark.createDataFrame(Seq(
-  (0, x, y, p, w)
-)).toDF("id", "x", "y", "p", "w")
+val df = Seq((0, x, y, p, w)).toDF("id", "x", "y", "p", "w")
   .withColumn("x", col("x"), metadata)
--- End diff --

Replace `col("x")` with `$"x"` or (better) `'x`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...

2016-07-03 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14035#discussion_r69391151
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala
 ---
@@ -52,23 +53,20 @@ class GeneralizedLinearRegressionSuite
 
 import GeneralizedLinearRegressionSuite._
 
-datasetGaussianIdentity = spark.createDataFrame(
-  sc.parallelize(generateGeneralizedLinearRegressionInput(
-intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = 
Array(2.9, 10.5),
-xVariance = Array(0.7, 1.2), nPoints = 1, seed, noiseLevel = 
0.01,
-family = "gaussian", link = "identity"), 2))
+datasetGaussianIdentity = 
sc.parallelize(generateGeneralizedLinearRegressionInput(
--- End diff --

Why is this `sc.parallelize` needed here? Why are `2` partitions used?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...

2016-07-03 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14035#discussion_r69391139
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala ---
@@ -102,7 +103,7 @@ class VectorIndexerSuite extends SparkFunSuite with 
MLlibTestSparkContext
   }
 
   test("Cannot fit an empty DataFrame") {
-val rdd = spark.createDataFrame(sc.parallelize(Array.empty[Vector], 
2).map(FeatureData))
+val rdd = sc.parallelize(Array.empty[Vector], 
2).map(FeatureData).toDF()
--- End diff --

Do you need `sc.parallelize`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...

2016-07-03 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14035#discussion_r69391120
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala ---
@@ -39,7 +40,7 @@ class StringIndexerSuite
 
   test("StringIndexer") {
 val data = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), 
(4, "a"), (5, "c")), 2)
--- End diff --

Could you remove `sc.parallelize`, too?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...

2016-07-03 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14035#discussion_r69390423
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala ---
@@ -29,10 +29,11 @@ import org.apache.spark.sql.types._
 
 class OneHotEncoderSuite
   extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
+  import testImplicits._
 
   def stringIndexed(): DataFrame = {
 val data = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), 
(4, "a"), (5, "c")), 2)
--- End diff --

Remove `sc.parallelize`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...

2016-07-03 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14035#discussion_r69390388
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala ---
@@ -61,7 +62,7 @@ class NormalizerSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
   Vectors.sparse(3, Seq())
 )
 
-dataFrame = spark.createDataFrame(sc.parallelize(data, 
2).map(NormalizerSuite.FeatureData))
+dataFrame = sc.parallelize(data, 
2).map(NormalizerSuite.FeatureData).toDF()
--- End diff --

Remove `sc.parallelize`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...

2016-07-03 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14035#discussion_r69390273
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/MinMaxScalerSuite.scala ---
@@ -57,8 +58,7 @@ class MinMaxScalerSuite extends SparkFunSuite with 
MLlibTestSparkContext with De
 
   test("MinMaxScaler arguments max must be larger than min") {
 withClue("arguments max must be larger than min") {
-  val dummyDF = spark.createDataFrame(Seq(
-(1, Vectors.dense(1.0, 2.0.toDF("id", "feature")
+  val dummyDF = Seq((1, Vectors.dense(1.0, 2.0))).toDF("id", "feature")
--- End diff --

It's just a column name, but for consistency...`features` (not `feature`) 
(unless there's a reason for this)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...

2016-07-03 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14035#discussion_r69390216
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/CountVectorizerSuite.scala ---
@@ -44,7 +45,7 @@ class CountVectorizerSuite extends SparkFunSuite with 
MLlibTestSparkContext
   (3, split(""), Vectors.sparse(4, Seq())), // empty string
--- End diff --

Replace the comment `// empty string` with `val EMPTY_STRING = ""`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...

2016-07-03 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14035#discussion_r69390204
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/evaluation/RegressionEvaluatorSuite.scala
 ---
@@ -42,9 +43,10 @@ class RegressionEvaluatorSuite
  * data.map(x=> x.label + ", " + x.features(0) + ", " + x.features(1))
  *   .saveAsTextFile("path")
  */
-val dataset = spark.createDataFrame(
-  sc.parallelize(LinearDataGenerator.generateLinearInput(
-6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 
0.1), 2).map(_.asML))
+val dataset = sc.parallelize(
--- End diff --

Remove `sc.parallelize`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...

2016-07-03 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14035#discussion_r69390189
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala
 ---
@@ -158,7 +159,7 @@ class RandomForestClassifierSuite
   }
 
   test("Fitting without numClasses in metadata") {
-val df: DataFrame = 
spark.createDataFrame(TreeTests.featureImportanceData(sc))
+val df: DataFrame = TreeTests.featureImportanceData(sc).toDF()
--- End diff --

Why is the type annotation needed here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...

2016-07-03 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14035#discussion_r69390185
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala ---
@@ -55,7 +56,7 @@ class OneVsRestSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defau
 val xVariance = Array(0.6856, 0.1899, 3.116, 0.581)
 rdd = sc.parallelize(generateMultinomialLogisticInput(
   coefficients, xMean, xVariance, true, nPoints, 42), 2)
-dataset = spark.createDataFrame(rdd)
+dataset = rdd.toDF()
--- End diff --

Merge it with line 57.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...

2016-07-03 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14035#discussion_r69390178
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala 
---
@@ -47,7 +48,7 @@ class NaiveBayesSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
   Array(0.10, 0.10, 0.70, 0.10)  // label 2
 ).map(_.map(math.log))
 
-dataset = spark.createDataFrame(generateNaiveBayesInput(pi, theta, 
100, 42))
+dataset = generateNaiveBayesInput(pi, theta, 100, 42).toDF()
--- End diff --

Exactly my point above :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...

2016-07-03 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14035#discussion_r69390173
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
 ---
@@ -116,7 +117,7 @@ class MultilayerPerceptronClassifierSuite
 // the input seed is somewhat magic, to make this test pass
 val rdd = sc.parallelize(generateMultinomialLogisticInput(
   coefficients, xMean, xVariance, true, nPoints, 1), 2)
-val dataFrame = spark.createDataFrame(rdd).toDF("label", "features")
+val dataFrame = rdd.toDF("label", "features")
--- End diff --

Could we merge this line with 118? I don't think 118 needs `sc.parallelize`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...

2016-07-03 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14035#discussion_r69390144
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
 ---
@@ -55,7 +56,7 @@ class LogisticRegressionSuite
 generateMultinomialLogisticInput(coefficients, xMean, xVariance,
   addIntercept = true, nPoints, 42)
 
-  spark.createDataFrame(sc.parallelize(testData, 4))
+  sc.parallelize(testData, 4).toDF()
--- End diff --

`testData.toDF.repartition(4)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14035: [SPARK-16356][ML] Add testImplicits for ML unit t...

2016-07-03 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/14035#discussion_r69390147
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
 ---
@@ -869,8 +870,7 @@ class LogisticRegressionSuite
 }
   }
 
-  (spark.createDataFrame(sc.parallelize(data1, 4)),
-spark.createDataFrame(sc.parallelize(data2, 4)))
+  (sc.parallelize(data1, 4).toDF(), sc.parallelize(data2, 4).toDF())
--- End diff --

Same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   >