[jira] [Commented] (SPARK-26308) Large BigDecimal value is converted to null when passed into a UDF

2018-12-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725614#comment-16725614
 ] 

ASF GitHub Bot commented on SPARK-26308:


asfgit closed pull request #23308: [SPARK-26308][SQL] Avoid cast of decimals 
for ScalaUDF
URL: https://github.com/apache/spark/pull/23308
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 133fa119b7aa6..1706b3eece6d7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -879,6 +879,37 @@ object TypeCoercion {
   }
 }
 e.withNewChildren(children)
+
+  case udf: ScalaUDF if udf.inputTypes.nonEmpty =>
+val children = udf.children.zip(udf.inputTypes).map { case (in, 
expected) =>
+  implicitCast(in, udfInputToCastType(in.dataType, 
expected)).getOrElse(in)
+}
+udf.withNewChildren(children)
+}
+
+private def udfInputToCastType(input: DataType, expectedType: DataType): 
DataType = {
+  (input, expectedType) match {
+// SPARK-26308: avoid casting to an arbitrary precision and scale for 
decimals. Please note
+// that precision and scale cannot be inferred properly for a ScalaUDF 
because, when it is
+// created, it is not bound to any column. So here the precision and 
scale of the input
+// column is used.
+case (in: DecimalType, _: DecimalType) => in
+case (ArrayType(dtIn, _), ArrayType(dtExp, nullableExp)) =>
+  ArrayType(udfInputToCastType(dtIn, dtExp), nullableExp)
+case (MapType(keyDtIn, valueDtIn, _), MapType(keyDtExp, valueDtExp, 
nullableExp)) =>
+  MapType(udfInputToCastType(keyDtIn, keyDtExp),
+udfInputToCastType(valueDtIn, valueDtExp),
+nullableExp)
+case (StructType(fieldsIn), StructType(fieldsExp)) =>
+  val fieldTypes =
+fieldsIn.map(_.dataType).zip(fieldsExp.map(_.dataType)).map { case 
(dtIn, dtExp) =>
+  udfInputToCastType(dtIn, dtExp)
+}
+  StructType(fieldsExp.zip(fieldTypes).map { case (field, newDt) =>
+field.copy(dataType = newDt)
+  })
+case (_, other) => other
+  }
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
index fae90caebf96c..a23aaa3a0b3ef 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
@@ -52,7 +52,7 @@ case class ScalaUDF(
 udfName: Option[String] = None,
 nullable: Boolean = true,
 udfDeterministic: Boolean = true)
-  extends Expression with ImplicitCastInputTypes with NonSQLExpression with 
UserDefinedExpression {
+  extends Expression with NonSQLExpression with UserDefinedExpression {
 
   // The constructor for SPARK 2.1 and 2.2
   def this(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index 20dcefa7e3cad..a26d306cff6b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import java.math.BigDecimal
+
 import org.apache.spark.sql.api.java._
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.execution.QueryExecution
@@ -26,7 +28,7 @@ import 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationComm
 import org.apache.spark.sql.functions.{lit, udf}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.test.SQLTestData._
-import org.apache.spark.sql.types.{DataTypes, DoubleType}
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.QueryExecutionListener
 
 
@@ -420,4 +422,32 @@ class UDFSuite extends QueryTest with SharedSQLContext {
   checkAnswer(df, Seq(Row("null1x"), Row(null), Row("N3null")))
 }
   }
+
+  test("SPARK-26308: udf with decimal") {
+val df1 = spark.createDataFrame(
+  sparkContext.parallelize(Seq(Row(new 
BigDecimal("20110002456556",
+  StructType(Seq(StructField("col1", DecimalType(30, 0)
+val udf1 = org.apache.spark.sql.functions.udf((value: 

[jira] [Resolved] (SPARK-26308) Large BigDecimal value is converted to null when passed into a UDF

2018-12-19 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-26308.
-
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 23308
[https://github.com/apache/spark/pull/23308]

> Large BigDecimal value is converted to null when passed into a UDF
> --
>
> Key: SPARK-26308
> URL: https://issues.apache.org/jira/browse/SPARK-26308
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Jay Pranavamurthi
>Assignee: Marco Gaido
>Priority: Major
> Fix For: 3.0.0
>
>
> We are loading a Hive table into a Spark DataFrame. The Hive table has a 
> decimal(30, 0) column with values greater than Long.MAX_VALUE. The DataFrame 
> loads correctly.
> We then use a UDF to convert the decimal type to a String value. For decimal 
> values < Long.MAX_VALUE, this works fine, but when the decimal value > 
> Long.MAX_VALUE, the input to the UDF is a *null*.
> Hive table schema and data:
> {code:java}
> create table decimal_test (col1 decimal(30, 0), col2 decimal(10, 0), col3 
> int, col4 string);
> insert into decimal_test values(20110002456556, 123456789, 10, 
> 'test1');
> {code}
>  
> Execution in spark-shell:
> _(Note that the first column in the final output is null, it should have been 
> "20110002456556")_
> {code:java}
> scala> val df1 = spark.sqlContext.sql("select * from decimal_test")
> df1: org.apache.spark.sql.DataFrame = [col1: decimal(30,0), col2: 
> decimal(10,0) ... 2 more fields]
> scala> df1.show
> ++-++-+
> | col1| col2|col3| col4|
> ++-++-+
> |201100024...|123456789| 10|test1|
> ++-++-+
> scala> val decimalToString = (value: java.math.BigDecimal) => if (value == 
> null) null else { value.toBigInteger().toString }
> decimalToString: java.math.BigDecimal => String = 
> scala> val udf1 = org.apache.spark.sql.functions.udf(decimalToString)
> udf1: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,StringType,Some(List(DecimalType(38,18
> scala> val df2 = df1.withColumn("col1", udf1(df1.col("col1")))
> df2: org.apache.spark.sql.DataFrame = [col1: string, col2: decimal(10,0) ... 
> 2 more fields]
> scala> df2.show
> ++-++-+
> |col1| col2|col3| col4|
> ++-++-+
> |null|123456789| 10|test1|
> ++-++-+
> {code}
> Oddly this works if we change the "decimalToString" udf to take an "Any" 
> instead of a "java.math.BigDecimal"
> {code:java}
> scala> val decimalToString = (value: Any) => if (value == null) null else { 
> if (value.isInstanceOf[java.math.BigDecimal]) 
> value.asInstanceOf[java.math.BigDecimal].toBigInteger().toString else null }
> decimalToString: Any => String = 
> scala> val udf1 = org.apache.spark.sql.functions.udf(decimalToString)
> udf1: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,StringType,None)
> scala> val df2 = df1.withColumn("col1", udf1(df1.col("col1")))
> df2: org.apache.spark.sql.DataFrame = [col1: string, col2: decimal(10,0) ... 
> 2 more fields]
> scala> df2.show
> ++-++-+
> | col1| col2|col3| col4|
> ++-++-+
> |201100024...|123456789| 10|test1|
> ++-++-+
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26308) Large BigDecimal value is converted to null when passed into a UDF

2018-12-19 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-26308:
---

Assignee: Marco Gaido

> Large BigDecimal value is converted to null when passed into a UDF
> --
>
> Key: SPARK-26308
> URL: https://issues.apache.org/jira/browse/SPARK-26308
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Jay Pranavamurthi
>Assignee: Marco Gaido
>Priority: Major
>
> We are loading a Hive table into a Spark DataFrame. The Hive table has a 
> decimal(30, 0) column with values greater than Long.MAX_VALUE. The DataFrame 
> loads correctly.
> We then use a UDF to convert the decimal type to a String value. For decimal 
> values < Long.MAX_VALUE, this works fine, but when the decimal value > 
> Long.MAX_VALUE, the input to the UDF is a *null*.
> Hive table schema and data:
> {code:java}
> create table decimal_test (col1 decimal(30, 0), col2 decimal(10, 0), col3 
> int, col4 string);
> insert into decimal_test values(20110002456556, 123456789, 10, 
> 'test1');
> {code}
>  
> Execution in spark-shell:
> _(Note that the first column in the final output is null, it should have been 
> "20110002456556")_
> {code:java}
> scala> val df1 = spark.sqlContext.sql("select * from decimal_test")
> df1: org.apache.spark.sql.DataFrame = [col1: decimal(30,0), col2: 
> decimal(10,0) ... 2 more fields]
> scala> df1.show
> ++-++-+
> | col1| col2|col3| col4|
> ++-++-+
> |201100024...|123456789| 10|test1|
> ++-++-+
> scala> val decimalToString = (value: java.math.BigDecimal) => if (value == 
> null) null else { value.toBigInteger().toString }
> decimalToString: java.math.BigDecimal => String = 
> scala> val udf1 = org.apache.spark.sql.functions.udf(decimalToString)
> udf1: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,StringType,Some(List(DecimalType(38,18
> scala> val df2 = df1.withColumn("col1", udf1(df1.col("col1")))
> df2: org.apache.spark.sql.DataFrame = [col1: string, col2: decimal(10,0) ... 
> 2 more fields]
> scala> df2.show
> ++-++-+
> |col1| col2|col3| col4|
> ++-++-+
> |null|123456789| 10|test1|
> ++-++-+
> {code}
> Oddly this works if we change the "decimalToString" udf to take an "Any" 
> instead of a "java.math.BigDecimal"
> {code:java}
> scala> val decimalToString = (value: Any) => if (value == null) null else { 
> if (value.isInstanceOf[java.math.BigDecimal]) 
> value.asInstanceOf[java.math.BigDecimal].toBigInteger().toString else null }
> decimalToString: Any => String = 
> scala> val udf1 = org.apache.spark.sql.functions.udf(decimalToString)
> udf1: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,StringType,None)
> scala> val df2 = df1.withColumn("col1", udf1(df1.col("col1")))
> df2: org.apache.spark.sql.DataFrame = [col1: string, col2: decimal(10,0) ... 
> 2 more fields]
> scala> df2.show
> ++-++-+
> | col1| col2|col3| col4|
> ++-++-+
> |201100024...|123456789| 10|test1|
> ++-++-+
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26318) Deprecate function merge in Row

2018-12-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725595#comment-16725595
 ] 

ASF GitHub Bot commented on SPARK-26318:


asfgit closed pull request #23271: [SPARK-26318][SQL] Deprecate Row.merge
URL: https://github.com/apache/spark/pull/23271
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
index e12bf9616e2de..4f5af9ac80b10 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
@@ -57,6 +57,7 @@ object Row {
   /**
* Merge multiple rows into a single row, one after another.
*/
+  @deprecated("This method is deprecated and will be removed in future 
versions.", "3.0.0")
   def merge(rows: Row*): Row = {
 // TODO: Improve the performance of this if used in performance critical 
part.
 new GenericRow(rows.flatMap(_.toSeq).toArray)


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deprecate function merge in Row
> ---
>
> Key: SPARK-26318
> URL: https://issues.apache.org/jira/browse/SPARK-26318
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Liang Li
>Assignee: Liang Li
>Priority: Trivial
> Fix For: 3.0.0
>
>
> Seems no one uses Row.merge. We might have to deprecate it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26318) Deprecate function merge in Row

2018-12-19 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-26318:
-
Description: Seems no one uses Row.merge. We might have to deprecate it.  
(was: Enhance function merge performance in Row

Like do 1 time Row.merge for input 

val row1 = Row("name", "work", 2314, "null", 1, ""), it need 108458 millisecond

After do some enhancement, it only need 24967 millisecond)

> Deprecate function merge in Row
> ---
>
> Key: SPARK-26318
> URL: https://issues.apache.org/jira/browse/SPARK-26318
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Liang Li
>Assignee: Liang Li
>Priority: Trivial
> Fix For: 3.0.0
>
>
> Seems no one uses Row.merge. We might have to deprecate it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26318) Deprecate function merge in Row

2018-12-19 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-26318.
--
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 23271
[https://github.com/apache/spark/pull/23271]

> Deprecate function merge in Row
> ---
>
> Key: SPARK-26318
> URL: https://issues.apache.org/jira/browse/SPARK-26318
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Liang Li
>Assignee: Liang Li
>Priority: Trivial
> Fix For: 3.0.0
>
>
> Enhance function merge performance in Row
> Like do 1 time Row.merge for input 
> val row1 = Row("name", "work", 2314, "null", 1, ""), it need 108458 
> millisecond
> After do some enhancement, it only need 24967 millisecond



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26318) Deprecate function merge in Row

2018-12-19 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-26318:


Assignee: Liang Li

> Deprecate function merge in Row
> ---
>
> Key: SPARK-26318
> URL: https://issues.apache.org/jira/browse/SPARK-26318
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Liang Li
>Assignee: Liang Li
>Priority: Trivial
> Fix For: 3.0.0
>
>
> Enhance function merge performance in Row
> Like do 1 time Row.merge for input 
> val row1 = Row("name", "work", 2314, "null", 1, ""), it need 108458 
> millisecond
> After do some enhancement, it only need 24967 millisecond



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26409) SQLConf should be serializable in test sessions

2018-12-19 Thread Gengliang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang updated SPARK-26409:
---
Affects Version/s: 2.4.0

> SQLConf should be serializable in test sessions
> ---
>
> Key: SPARK-26409
> URL: https://issues.apache.org/jira/browse/SPARK-26409
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Tests
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Gengliang Wang
>Priority: Major
>
> `SQLConf` is supposed to be serializable. However, currently it is not  
> serializable in `WithTestConf`. `WithTestConf` uses the method 
> `overrideConfs` in closure, while the classes which implements it 
> (`TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder`) are not 
> serializable.
> Use a local variable to fix it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown

2018-12-19 Thread Fengyu Cao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725533#comment-16725533
 ] 

Fengyu Cao commented on SPARK-26389:


console output force use temp checkpoint (I just want to test my code)

and there is no way to disable checkpoint

> temp checkpoint folder at executor should be deleted on graceful shutdown
> -
>
> Key: SPARK-26389
> URL: https://issues.apache.org/jira/browse/SPARK-26389
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Fengyu Cao
>Priority: Major
>
> {{spark-submit --master mesos:// -conf 
> spark.streaming.stopGracefullyOnShutdown=true  framework>}}
> CTRL-C, framework shutdown
> {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = 
> f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = 
> 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error 
> org.apache.spark.SparkException: Writing job aborted.}}
> {{/tmp/temporary- on executor not deleted due to 
> org.apache.spark.SparkException: Writing job aborted., and this temp 
> checkpoint can't used to recovery.}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25271) Creating parquet table with all the column null throws exception

2018-12-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725532#comment-16725532
 ] 

ASF GitHub Bot commented on SPARK-25271:


asfgit closed pull request #22514: [SPARK-25271][SQL] Hive ctas commands should 
use data source if it is convertible
URL: https://github.com/apache/spark/pull/22514
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index e1faecedd20ed..096481f68275d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -820,6 +820,14 @@ object DDLUtils {
 table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) != 
HIVE_PROVIDER
   }
 
+  def readHiveTable(table: CatalogTable): HiveTableRelation = {
+HiveTableRelation(
+  table,
+  // Hive table columns are always nullable.
+  table.dataSchema.asNullable.toAttributes,
+  table.partitionSchema.asNullable.toAttributes)
+  }
+
   /**
* Throws a standard error for actions that require partitionProvider = hive.
*/
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index b304e2da6e1cf..b5cf8c9515bfb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -244,27 +244,19 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
 })
   }
 
-  private def readHiveTable(table: CatalogTable): LogicalPlan = {
-HiveTableRelation(
-  table,
-  // Hive table columns are always nullable.
-  table.dataSchema.asNullable.toAttributes,
-  table.partitionSchema.asNullable.toAttributes)
-  }
-
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
 case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
 if DDLUtils.isDatasourceTable(tableMeta) =>
   i.copy(table = readDataSourceTable(tableMeta))
 
 case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) 
=>
-  i.copy(table = readHiveTable(tableMeta))
+  i.copy(table = DDLUtils.readHiveTable(tableMeta))
 
 case UnresolvedCatalogRelation(tableMeta) if 
DDLUtils.isDatasourceTable(tableMeta) =>
   readDataSourceTable(tableMeta)
 
 case UnresolvedCatalogRelation(tableMeta) =>
-  readHiveTable(tableMeta)
+  DDLUtils.readHiveTable(tableMeta)
   }
 }
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 5823548a8063c..03f4b8d83e353 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive
 
+import java.util.Locale
+
 import scala.util.control.NonFatal
 
 import com.google.common.util.concurrent.Striped
@@ -29,6 +31,8 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetOptions}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
 import org.apache.spark.sql.types._
 
@@ -113,7 +117,44 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
 }
   }
 
-  def convertToLogicalRelation(
+  // Return true for Apache ORC and Hive ORC-related configuration names.
+  // Note that Spark doesn't support configurations like 
`hive.merge.orcfile.stripe.level`.
+  private def isOrcProperty(key: String) =
+key.startsWith("orc.") || key.contains(".orc.")
+
+  private def isParquetProperty(key: String) =
+key.startsWith("parquet.") || key.contains(".parquet.")
+
+  def convert(relation: HiveTableRelation): LogicalRelation = {
+val serde = 
relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
+
+// Consider table and storage properties. For properties existing in both 
sides, storage
+// properties will supersede table properties.
+

[jira] [Assigned] (SPARK-25271) Creating parquet table with all the column null throws exception

2018-12-19 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-25271:
---

Assignee: Liang-Chi Hsieh

> Creating parquet table with all the column null throws exception
> 
>
> Key: SPARK-25271
> URL: https://issues.apache.org/jira/browse/SPARK-25271
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Shivu Sondur
>Assignee: Liang-Chi Hsieh
>Priority: Critical
> Fix For: 3.0.0
>
> Attachments: image-2018-09-07-09-12-34-944.png, 
> image-2018-09-07-09-29-33-370.png, image-2018-09-07-09-29-52-899.png, 
> image-2018-09-07-09-32-43-892.png, image-2018-09-07-09-33-03-095.png
>
>
> {code:java}
>  1)cat /data/parquet.dat
> 1$abc2$pqr:3$xyz
> null{code}
>  
> {code:java}
> 2)spark.sql("create table vp_reader_temp (projects map) ROW 
> FORMAT DELIMITED FIELDS TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY ':' 
> MAP KEYS TERMINATED BY '$'")
> {code}
> {code:java}
> 3)spark.sql("
> LOAD DATA LOCAL INPATH '/data/parquet.dat' INTO TABLE vp_reader_temp")
> {code}
> {code:java}
> 4)spark.sql("create table vp_reader STORED AS PARQUET as select * from 
> vp_reader_temp")
> {code}
> *Result :* Throwing exception (Working fine with spark 2.2.1)
> {code:java}
> java.lang.RuntimeException: Parquet record is malformed: empty fields are 
> illegal, the field should be ommited completely instead
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:64)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31)
>   at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:123)
>   at 
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:180)
>   at 
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:46)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:112)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:125)
>   at 
> org.apache.spark.sql.hive.execution.HiveOutputWriter.write(HiveFileFormat.scala:149)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:406)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:283)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:281)
>   at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1438)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:286)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:211)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:210)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:109)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:349)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.parquet.io.ParquetEncodingException: empty fields are 
> illegal, the field should be ommited completely instead
>   at 
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:320)
>   at 
> org.apache.parquet.io.RecordConsumerLoggingWrapper.endField(RecordConsumerLoggingWrapper.java:165)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeMap(DataWritableWriter.java:241)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeValue(DataWritableWriter.java:116)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeGroupFields(DataWritableWriter.java:89)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:60)
>   

[jira] [Resolved] (SPARK-25271) Creating parquet table with all the column null throws exception

2018-12-19 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-25271.
-
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 22514
[https://github.com/apache/spark/pull/22514]

> Creating parquet table with all the column null throws exception
> 
>
> Key: SPARK-25271
> URL: https://issues.apache.org/jira/browse/SPARK-25271
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Shivu Sondur
>Assignee: Liang-Chi Hsieh
>Priority: Critical
> Fix For: 3.0.0
>
> Attachments: image-2018-09-07-09-12-34-944.png, 
> image-2018-09-07-09-29-33-370.png, image-2018-09-07-09-29-52-899.png, 
> image-2018-09-07-09-32-43-892.png, image-2018-09-07-09-33-03-095.png
>
>
> {code:java}
>  1)cat /data/parquet.dat
> 1$abc2$pqr:3$xyz
> null{code}
>  
> {code:java}
> 2)spark.sql("create table vp_reader_temp (projects map) ROW 
> FORMAT DELIMITED FIELDS TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY ':' 
> MAP KEYS TERMINATED BY '$'")
> {code}
> {code:java}
> 3)spark.sql("
> LOAD DATA LOCAL INPATH '/data/parquet.dat' INTO TABLE vp_reader_temp")
> {code}
> {code:java}
> 4)spark.sql("create table vp_reader STORED AS PARQUET as select * from 
> vp_reader_temp")
> {code}
> *Result :* Throwing exception (Working fine with spark 2.2.1)
> {code:java}
> java.lang.RuntimeException: Parquet record is malformed: empty fields are 
> illegal, the field should be ommited completely instead
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:64)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31)
>   at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:123)
>   at 
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:180)
>   at 
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:46)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:112)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:125)
>   at 
> org.apache.spark.sql.hive.execution.HiveOutputWriter.write(HiveFileFormat.scala:149)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:406)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:283)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:281)
>   at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1438)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:286)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:211)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:210)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:109)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:349)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.parquet.io.ParquetEncodingException: empty fields are 
> illegal, the field should be ommited completely instead
>   at 
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:320)
>   at 
> org.apache.parquet.io.RecordConsumerLoggingWrapper.endField(RecordConsumerLoggingWrapper.java:165)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeMap(DataWritableWriter.java:241)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeValue(DataWritableWriter.java:116)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeGroupFields(DataWritableWriter.java:89)
>   at 
> 

[jira] [Comment Edited] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown

2018-12-19 Thread Fengyu Cao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725528#comment-16725528
 ] 

Fengyu Cao edited comment on SPARK-26389 at 12/20/18 2:45 AM:
--

thanks for reply

Two scenarios:
 # {{temp checkpoint dir /tmp/temporary- on worker node}}
 # framework restart
 # {{temp checkpoint dir now /tmp/temporary- (/tmp/temporary- 
can't used to recovery and should be deleted)}}

 
 # {{temp checkpoint dir /tmp/temporary- on worker node}}
 # executor stop in some reason
 # executor start on another worker nodes (/tmp/temporary- can't used to 
recovery either)

 

Maybe temp checkpoint dir should be deleted on JVM stop?

 

 
{quote}spark.streaming.stopGracefullyOnShutdown is a DStreams parameter and not 
Structured Streaming one.
{quote}
 

sorry, I didn't notice this.


was (Author: camper42):
thanks for reply

Two scenarios:
 # {{temp checkpoint dir /tmp/temporary-}}
 # framework restart
 # {{temp checkpoint dir now /tmp/temporary- (/tmp/temporary- 
can't used to recovery and should be deleted)}}

 
 # {{temp checkpoint dir /tmp/temporary-}}
 # executor stop in some reason
 # executor start on another worker nodes (/tmp/temporary- can't used to 
recovery either)

 

Maybe temp checkpoint dir should be deleted on JVM stop?

 

 
{quote}spark.streaming.stopGracefullyOnShutdown is a DStreams parameter and not 
Structured Streaming one.
{quote}
 

sorry, I didn't notice this.

> temp checkpoint folder at executor should be deleted on graceful shutdown
> -
>
> Key: SPARK-26389
> URL: https://issues.apache.org/jira/browse/SPARK-26389
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Fengyu Cao
>Priority: Major
>
> {{spark-submit --master mesos:// -conf 
> spark.streaming.stopGracefullyOnShutdown=true  framework>}}
> CTRL-C, framework shutdown
> {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = 
> f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = 
> 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error 
> org.apache.spark.SparkException: Writing job aborted.}}
> {{/tmp/temporary- on executor not deleted due to 
> org.apache.spark.SparkException: Writing job aborted., and this temp 
> checkpoint can't used to recovery.}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26262) Runs SQLQueryTestSuite on mixed config sets: WHOLESTAGE_CODEGEN_ENABLED and CODEGEN_FACTORY_MODE

2018-12-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725530#comment-16725530
 ] 

ASF GitHub Bot commented on SPARK-26262:


asfgit closed pull request #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on 
mixed config sets: WHOLESTAGE_CODEGEN_ENABLED and CODEGEN_FACTORY_MODE
URL: https://github.com/apache/spark/pull/23213
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql 
b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
index ec263ea70bd4a..7e81ff1aba37b 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
@@ -141,8 +141,3 @@ SELECT every("true");
 SELECT k, v, every(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg;
 SELECT k, v, some(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg;
 SELECT k, v, any(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg;
-
--- simple explain of queries having every/some/any agregates. Optimized
--- plan should show the rewritten aggregate expression.
-EXPLAIN EXTENDED SELECT k, every(v), some(v), any(v) FROM test_agg GROUP BY k;
-
diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql 
b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
index 41d316444ed6b..b3ec956cd178e 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
@@ -49,6 +49,3 @@ select * from values ("one", count(1)), ("two", 2) as data(a, 
b);
 
 -- string to timestamp
 select * from values (timestamp('1991-12-06 00:00:00.0'), 
array(timestamp('1991-12-06 01:00:00.0'), timestamp('1991-12-06 12:00:00.0'))) 
as data(a, b);
-
--- cross-join inline tables
-EXPLAIN EXTENDED SELECT * FROM VALUES ('one', 1), ('three', null) CROSS JOIN 
VALUES ('one', 1), ('three', null);
diff --git a/sql/core/src/test/resources/sql-tests/inputs/operators.sql 
b/sql/core/src/test/resources/sql-tests/inputs/operators.sql
index 37f9cd44da7f2..ba14789d48db6 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/operators.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/operators.sql
@@ -29,27 +29,6 @@ select 2 * 5;
 select 5 % 3;
 select pmod(-7, 3);
 
--- check operator precedence.
--- We follow Oracle operator precedence in the table below that lists the 
levels of precedence
--- among SQL operators from high to low:
---
--- Operator  Operation
---
--- +, -  identity, negation
--- *, /  multiplication, division
--- +, -, ||  addition, subtraction, 
concatenation
--- =, !=, <, >, <=, >=, IS NULL, LIKE, BETWEEN, IN   comparison
--- NOT   exponentiation, logical 
negation
--- AND   conjunction
--- ORdisjunction
---
-explain select 'a' || 1 + 2;
-explain select 1 - 2 || 'b';
-explain select 2 * 4  + 3 || 'b';
-explain select 3 + 1 || 'a' || 4 / 2;
-explain select 1 == 1 OR 'a' || 'b' ==  'ab';
-explain select 'a' || 'c' == 'ac' AND 2 == 3;
-
 -- math functions
 select cot(1);
 select cot(null);
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/sql-compatibility-functions.sql 
b/sql/core/src/test/resources/sql-tests/inputs/sql-compatibility-functions.sql
index f1461032065ad..1ae49c8bfc76a 100644
--- 
a/sql/core/src/test/resources/sql-tests/inputs/sql-compatibility-functions.sql
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/sql-compatibility-functions.sql
@@ -12,11 +12,6 @@ SELECT nullif(1, 2.1d), nullif(1, 1.0d);
 SELECT nvl(1, 2.1d), nvl(null, 2.1d);
 SELECT nvl2(null, 1, 2.1d), nvl2('n', 1, 2.1d);
 
--- explain for these functions; use range to avoid constant folding
-explain extended
-select ifnull(id, 'x'), nullif(id, 'x'), nvl(id, 'x'), nvl2(id, 'x', 'y')
-from range(2);
-
 -- SPARK-16730 cast alias functions for Hive compatibility
 SELECT boolean(1), tinyint(1), smallint(1), int(1), bigint(1);
 SELECT float(1), double(1), decimal(1);
diff --git a/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql 
b/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql
index 2effb43183d75..fbc231627e36f 100644
--- 

[jira] [Resolved] (SPARK-26262) Runs SQLQueryTestSuite on mixed config sets: WHOLESTAGE_CODEGEN_ENABLED and CODEGEN_FACTORY_MODE

2018-12-19 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-26262.
-
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 23213
[https://github.com/apache/spark/pull/23213]

> Runs SQLQueryTestSuite on mixed config sets: WHOLESTAGE_CODEGEN_ENABLED and 
> CODEGEN_FACTORY_MODE
> 
>
> Key: SPARK-26262
> URL: https://issues.apache.org/jira/browse/SPARK-26262
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Takeshi Yamamuro
>Assignee: Takeshi Yamamuro
>Priority: Minor
> Fix For: 3.0.0
>
>
> For better test coverage, we need to run `SQLQueryTestSuite` on 4 mixed 
> config sets:
> 1. WHOLESTAGE_CODEGEN_ENABLED=true, CODEGEN_FACTORY_MODE=CODEGEN_ONLY
> 2. WHOLESTAGE_CODEGEN_ENABLED=false, CODEGEN_FACTORY_MODE=CODEGEN_ONLY
> 3. WHOLESTAGE_CODEGEN_ENABLED=true, CODEGEN_FACTORY_MODE=NO_CODEGEN
> 4. WHOLESTAGE_CODEGEN_ENABLED=false, CODEGEN_FACTORY_MODE=NO_CODEGEN



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown

2018-12-19 Thread Fengyu Cao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725528#comment-16725528
 ] 

Fengyu Cao edited comment on SPARK-26389 at 12/20/18 2:44 AM:
--

thanks for reply

Two scenarios:
 # {{temp checkpoint dir /tmp/temporary-}}
 # framework restart
 # {{temp checkpoint dir now /tmp/temporary- (/tmp/temporary- 
can't used to recovery and should be deleted)}}

 
 # {{temp checkpoint dir /tmp/temporary-}}
 # executor stop in some reason
 # executor start on another worker nodes (/tmp/temporary- can't used to 
recovery either)

 

Maybe temp checkpoint dir should be deleted on JVM stop?

 

 
{quote}spark.streaming.stopGracefullyOnShutdown is a DStreams parameter and not 
Structured Streaming one.
{quote}
 

sorry, I didn't notice this.


was (Author: camper42):
thanks for reply

Two scenarios:
 # {{temp checkpoint dir /tmp/temporary-}}
 # framework restart
 # {{temp checkpoint dir now /tmp/temporary- (/tmp/temporary- 
can't used to recovery and should be deleted)}}

 
 # {{temp checkpoint dir /tmp/temporary-}}
 # executor stop in some reason
 # executor start on another worker nodes (/tmp/temporary- can't used to 
recovery either)

 

May be temp checkpoint dir should be deleted on JVM stop?

 

 
{quote}spark.streaming.stopGracefullyOnShutdown is a DStreams parameter and not 
Structured Streaming one.
{quote}
 

sorry, I didn't notice this.

> temp checkpoint folder at executor should be deleted on graceful shutdown
> -
>
> Key: SPARK-26389
> URL: https://issues.apache.org/jira/browse/SPARK-26389
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Fengyu Cao
>Priority: Major
>
> {{spark-submit --master mesos:// -conf 
> spark.streaming.stopGracefullyOnShutdown=true  framework>}}
> CTRL-C, framework shutdown
> {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = 
> f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = 
> 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error 
> org.apache.spark.SparkException: Writing job aborted.}}
> {{/tmp/temporary- on executor not deleted due to 
> org.apache.spark.SparkException: Writing job aborted., and this temp 
> checkpoint can't used to recovery.}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26262) Runs SQLQueryTestSuite on mixed config sets: WHOLESTAGE_CODEGEN_ENABLED and CODEGEN_FACTORY_MODE

2018-12-19 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-26262:
---

Assignee: Takeshi Yamamuro

> Runs SQLQueryTestSuite on mixed config sets: WHOLESTAGE_CODEGEN_ENABLED and 
> CODEGEN_FACTORY_MODE
> 
>
> Key: SPARK-26262
> URL: https://issues.apache.org/jira/browse/SPARK-26262
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Takeshi Yamamuro
>Assignee: Takeshi Yamamuro
>Priority: Minor
> Fix For: 3.0.0
>
>
> For better test coverage, we need to run `SQLQueryTestSuite` on 4 mixed 
> config sets:
> 1. WHOLESTAGE_CODEGEN_ENABLED=true, CODEGEN_FACTORY_MODE=CODEGEN_ONLY
> 2. WHOLESTAGE_CODEGEN_ENABLED=false, CODEGEN_FACTORY_MODE=CODEGEN_ONLY
> 3. WHOLESTAGE_CODEGEN_ENABLED=true, CODEGEN_FACTORY_MODE=NO_CODEGEN
> 4. WHOLESTAGE_CODEGEN_ENABLED=false, CODEGEN_FACTORY_MODE=NO_CODEGEN



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown

2018-12-19 Thread Fengyu Cao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725528#comment-16725528
 ] 

Fengyu Cao commented on SPARK-26389:


thanks for reply

Two scenarios:
 # {{temp checkpoint dir /tmp/temporary-}}
 # framework restart
 # {{temp checkpoint dir now /tmp/temporary- (/tmp/temporary- 
can't used to recovery and should be deleted)}}

 
 # {{temp checkpoint dir /tmp/temporary-}}
 # executor stop in some reason
 # executor start on another worker nodes (/tmp/temporary- can't used to 
recovery either)

 

May be temp checkpoint dir should be deleted on JVM stop?

 

 
{quote}spark.streaming.stopGracefullyOnShutdown is a DStreams parameter and not 
Structured Streaming one.
{quote}
 

sorry, I didn't notice this.

> temp checkpoint folder at executor should be deleted on graceful shutdown
> -
>
> Key: SPARK-26389
> URL: https://issues.apache.org/jira/browse/SPARK-26389
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Fengyu Cao
>Priority: Major
>
> {{spark-submit --master mesos:// -conf 
> spark.streaming.stopGracefullyOnShutdown=true  framework>}}
> CTRL-C, framework shutdown
> {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = 
> f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = 
> 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error 
> org.apache.spark.SparkException: Writing job aborted.}}
> {{/tmp/temporary- on executor not deleted due to 
> org.apache.spark.SparkException: Writing job aborted., and this temp 
> checkpoint can't used to recovery.}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26417) Make comments for states available for logging

2018-12-19 Thread Evgenii Lartcev (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evgenii Lartcev updated SPARK-26417:

Description: When add an extra `SparkListener`, it would be very useful (in 
production applications) to get the description of a state. For now it only 
available as comments in code. As suggestion it could be moved into constructor 
as well as finalization flag at enum 
org.apache.spark.launcher.SparkAppHandle.State  (was: When add an extra 
`SparkListener`, it would be very useful (in production applications) to get 
the description of a state. Now it only available as comments in code. As 
suggestion it could be moved into constructor as well as finalization flag at 
enum org.apache.spark.launcher.SparkAppHandle.State)

> Make comments for states available for logging
> --
>
> Key: SPARK-26417
> URL: https://issues.apache.org/jira/browse/SPARK-26417
> Project: Spark
>  Issue Type: Wish
>  Components: Java API, Spark Core
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Evgenii Lartcev
>Priority: Minor
>
> When add an extra `SparkListener`, it would be very useful (in production 
> applications) to get the description of a state. For now it only available as 
> comments in code. As suggestion it could be moved into constructor as well as 
> finalization flag at enum org.apache.spark.launcher.SparkAppHandle.State



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26417) Make comments for states available for logging

2018-12-19 Thread Evgenii Lartcev (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evgenii Lartcev updated SPARK-26417:

Description: When add an extra `SparkListener`, it would be very useful (in 
production applications) to get the description of a state. Now it only 
available as comments in code. As suggestion it could be moved into constructor 
as well as finalization flag at enum 
org.apache.spark.launcher.SparkAppHandle.State  (was: When add an extra 
`SparkListener`, it would be very useful (in production applications) to get 
the description of a state. Now it only available in comments. As suggestion it 
could be moved into constructor as well as finalization flag at enum 
org.apache.spark.launcher.SparkAppHandle.State)

> Make comments for states available for logging
> --
>
> Key: SPARK-26417
> URL: https://issues.apache.org/jira/browse/SPARK-26417
> Project: Spark
>  Issue Type: Wish
>  Components: Java API, Spark Core
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Evgenii Lartcev
>Priority: Minor
>
> When add an extra `SparkListener`, it would be very useful (in production 
> applications) to get the description of a state. Now it only available as 
> comments in code. As suggestion it could be moved into constructor as well as 
> finalization flag at enum org.apache.spark.launcher.SparkAppHandle.State



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26417) Make comments for states available for logging

2018-12-19 Thread Evgenii Lartcev (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evgenii Lartcev updated SPARK-26417:

Description: When add an extra `SparkListener`, it would be very useful (in 
production applications) to get the description of a state. Now it only 
available in comments. As suggestion it could be moved into constructor as well 
as finalization flag at enum org.apache.spark.launcher.SparkAppHandle.State  
(was: When add an extra `SparkListener`, it would be very useful in production 
mode to get the description of a state. Now it only available in comments. As 
suggestion it could be moved into constructor as well as finalization flag at 
enum org.apache.spark.launcher.SparkAppHandle.State)

> Make comments for states available for logging
> --
>
> Key: SPARK-26417
> URL: https://issues.apache.org/jira/browse/SPARK-26417
> Project: Spark
>  Issue Type: Wish
>  Components: Java API, Spark Core
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Evgenii Lartcev
>Priority: Minor
>
> When add an extra `SparkListener`, it would be very useful (in production 
> applications) to get the description of a state. Now it only available in 
> comments. As suggestion it could be moved into constructor as well as 
> finalization flag at enum org.apache.spark.launcher.SparkAppHandle.State



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26417) Make comments for states available for logging

2018-12-19 Thread Evgenii Lartcev (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evgenii Lartcev updated SPARK-26417:

Affects Version/s: 2.3.2

> Make comments for states available for logging
> --
>
> Key: SPARK-26417
> URL: https://issues.apache.org/jira/browse/SPARK-26417
> Project: Spark
>  Issue Type: Wish
>  Components: Java API, Spark Core
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Evgenii Lartcev
>Priority: Minor
>
> When add an extra `SparkListener`, it would be very useful in production mode 
> to get the description of a state. Now it only available in comments. As 
> suggestion it could be moved into constructor as well as finalization flag at 
> enum org.apache.spark.launcher.SparkAppHandle.State



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26417) Make comments for states available for logging

2018-12-19 Thread Evgenii Lartcev (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evgenii Lartcev updated SPARK-26417:

Description: When add an extra `SparkListener`, it would be very useful in 
production mode to get the description of a state. Now it only available in 
comments. As suggestion it could be moved into constructor as well as 
finalization flag at enum org.apache.spark.launcher.SparkAppHandle.State  (was: 
When add an extra `SparkListener`, it would be very useful in production mode 
to get the description of a state. Now it only available in comments. As 
suggestion it could be moved into constructor as well as finalization flag.)

> Make comments for states available for logging
> --
>
> Key: SPARK-26417
> URL: https://issues.apache.org/jira/browse/SPARK-26417
> Project: Spark
>  Issue Type: Wish
>  Components: Java API, Spark Core
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Evgenii Lartcev
>Priority: Minor
>
> When add an extra `SparkListener`, it would be very useful in production mode 
> to get the description of a state. Now it only available in comments. As 
> suggestion it could be moved into constructor as well as finalization flag at 
> enum org.apache.spark.launcher.SparkAppHandle.State



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26417) Make comments for states available for logging

2018-12-19 Thread Evgenii Lartcev (JIRA)
Evgenii Lartcev created SPARK-26417:
---

 Summary: Make comments for states available for logging
 Key: SPARK-26417
 URL: https://issues.apache.org/jira/browse/SPARK-26417
 Project: Spark
  Issue Type: Wish
  Components: Java API, Spark Core
Affects Versions: 2.4.0
Reporter: Evgenii Lartcev


When add an extra `SparkListener`, it would be very useful in production mode 
to get the description of a state. Now it only available in comments. As 
suggestion it could be moved into constructor as well as finalization flag.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26416) Refactor `ColumnPruning` from `Optimizer.scala` to `ColumnPruning.scala`

2018-12-19 Thread DB Tsai (JIRA)
DB Tsai created SPARK-26416:
---

 Summary: Refactor `ColumnPruning` from `Optimizer.scala` to 
`ColumnPruning.scala`
 Key: SPARK-26416
 URL: https://issues.apache.org/jira/browse/SPARK-26416
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.4.0
Reporter: DB Tsai
Assignee: DB Tsai
 Fix For: 3.0.0


As `Optimizer.scala` becomes bigger and bigger, it's hard to add new rules and 
maintain them. We are refactoring out `ColumnPruning` from `Optimizer.scala` to 
`ColumnPruning.scala` so it's easier to add new logics in `ColumnPruning`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26374) Support new date/timestamp parser in HadoopFsRelationTest

2018-12-19 Thread Maxim Gekk (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725422#comment-16725422
 ] 

Maxim Gekk commented on SPARK-26374:


Most likely this is related ticket: 
https://bugs.openjdk.java.net/browse/JDK-8181465

> Support new date/timestamp parser in HadoopFsRelationTest
> -
>
> Key: SPARK-26374
> URL: https://issues.apache.org/jira/browse/SPARK-26374
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 2.4.0
>Reporter: Maxim Gekk
>Priority: Minor
>
> The *test all data types* test uses the legacy parser for dates/timestamps 
> (https://github.com/apache/spark/pull/23196/files#diff-3986f801dd1335af3f300b006a03e987R129).
>  The test is passed on UTC timezone and new parser but fails on different 
> timezones 
> ([see|https://github.com/apache/spark/pull/23196#discussion_r241492360]):
> {code}
> == Correct Answer - 10 == == Spark Answer - 10 ==
> ...
> ![10,1670-02-11 14:09:54.746]  [10,1670-02-11 14:08:56.746]
> == Correct Answer - 10 == == Spark Answer - 10 ==
>  [1,6246-07-23 20:34:56.968]   [1,6246-07-23 20:34:56.968]
> ![2,0109-07-20 18:38:03.788]   [2,0109-07-20 18:37:05.788]
> {code}
> The ticket aims to switching on new parser independently from timezones.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown

2018-12-19 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725418#comment-16725418
 ] 

Gabor Somogyi commented on SPARK-26389:
---

{quote}can't used to recovery{quote}
What does this mean actually?

>From use-case perspective:
1. if you want to recover why don't you just set a not temp checkpoint?
2. if you want to recover why do you want it to get deleted?


> temp checkpoint folder at executor should be deleted on graceful shutdown
> -
>
> Key: SPARK-26389
> URL: https://issues.apache.org/jira/browse/SPARK-26389
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Fengyu Cao
>Priority: Major
>
> {{spark-submit --master mesos:// -conf 
> spark.streaming.stopGracefullyOnShutdown=true  framework>}}
> CTRL-C, framework shutdown
> {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = 
> f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = 
> 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error 
> org.apache.spark.SparkException: Writing job aborted.}}
> {{/tmp/temporary- on executor not deleted due to 
> org.apache.spark.SparkException: Writing job aborted., and this temp 
> checkpoint can't used to recovery.}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26415) Mark StreamSinkProvider and StreamSourceProvider as stable

2018-12-19 Thread Grant Henke (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725416#comment-16725416
 ] 

Grant Henke commented on SPARK-26415:
-

Apologies, feel free to adjust. I set the versions I think would be appropriate 
given the interface has been the same since those versions/branches were 
released.

> Mark StreamSinkProvider and StreamSourceProvider as stable
> --
>
> Key: SPARK-26415
> URL: https://issues.apache.org/jira/browse/SPARK-26415
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Grant Henke
>Priority: Major
>
> This change marks the StreamSinkProvider and StreamSourceProvider
> interfaces as stable so that it can be relied on for compatibility for all of
> Spark 2.x.
> These interfaces have been available since Spark 2.0.0 and unchanged
> since Spark 2.1.0. Additionally the Kafka integration has been using it
> since Spark 2.1.0.
> Because structured streaming general availability was announced in
> Spark 2.2.0, I suspect there are other third-party integrations using it
> already as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown

2018-12-19 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725413#comment-16725413
 ] 

Gabor Somogyi commented on SPARK-26389:
---

spark.streaming.stopGracefullyOnShutdown is a DStreams parameter and not 
Structured Streaming one.

> temp checkpoint folder at executor should be deleted on graceful shutdown
> -
>
> Key: SPARK-26389
> URL: https://issues.apache.org/jira/browse/SPARK-26389
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Fengyu Cao
>Priority: Major
>
> {{spark-submit --master mesos:// -conf 
> spark.streaming.stopGracefullyOnShutdown=true  framework>}}
> CTRL-C, framework shutdown
> {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = 
> f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = 
> 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error 
> org.apache.spark.SparkException: Writing job aborted.}}
> {{/tmp/temporary- on executor not deleted due to 
> org.apache.spark.SparkException: Writing job aborted., and this temp 
> checkpoint can't used to recovery.}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26415) Mark StreamSinkProvider and StreamSourceProvider as stable

2018-12-19 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725395#comment-16725395
 ] 

Gabor Somogyi edited comment on SPARK-26415 at 12/19/18 10:30 PM:
--

One minor thing: AFAIK target version should be set by committers.


was (Author: gsomogyi):
One minor thing: AFAIK targer version should be set by committers.

> Mark StreamSinkProvider and StreamSourceProvider as stable
> --
>
> Key: SPARK-26415
> URL: https://issues.apache.org/jira/browse/SPARK-26415
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Grant Henke
>Priority: Major
>
> This change marks the StreamSinkProvider and StreamSourceProvider
> interfaces as stable so that it can be relied on for compatibility for all of
> Spark 2.x.
> These interfaces have been available since Spark 2.0.0 and unchanged
> since Spark 2.1.0. Additionally the Kafka integration has been using it
> since Spark 2.1.0.
> Because structured streaming general availability was announced in
> Spark 2.2.0, I suspect there are other third-party integrations using it
> already as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26415) Mark StreamSinkProvider and StreamSourceProvider as stable

2018-12-19 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725395#comment-16725395
 ] 

Gabor Somogyi commented on SPARK-26415:
---

One minor thing: AFAIK targer version should be set by committers.

> Mark StreamSinkProvider and StreamSourceProvider as stable
> --
>
> Key: SPARK-26415
> URL: https://issues.apache.org/jira/browse/SPARK-26415
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Grant Henke
>Priority: Major
>
> This change marks the StreamSinkProvider and StreamSourceProvider
> interfaces as stable so that it can be relied on for compatibility for all of
> Spark 2.x.
> These interfaces have been available since Spark 2.0.0 and unchanged
> since Spark 2.1.0. Additionally the Kafka integration has been using it
> since Spark 2.1.0.
> Because structured streaming general availability was announced in
> Spark 2.2.0, I suspect there are other third-party integrations using it
> already as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26396) Kafka consumer cache overflow since 2.4.x

2018-12-19 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725393#comment-16725393
 ] 

Gabor Somogyi commented on SPARK-26396:
---

Should be.

Related the jira the described use-case is not how it should be designed.
Unless you've further issues I would like to close it with info provided.

> Kafka consumer cache overflow since 2.4.x
> -
>
> Key: SPARK-26396
> URL: https://issues.apache.org/jira/browse/SPARK-26396
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark 2.4 standalone client mode
>Reporter: Kaspar Tint
>Priority: Major
>
> We are experiencing an issue where the Kafka consumer cache seems to overflow 
> constantly upon starting the application. This issue appeared after upgrading 
> to Spark 2.4.
> We would get constant warnings like this:
> {code:java}
> 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43)
> {code}
> This application is running 4 different Spark Structured Streaming queries 
> against the same Kafka topic that has 90 partitions. We used to run it with 
> just the default settings so it defaulted to cache size 64 on Spark 2.3 but 
> now we tried to put it to 180 or 360. With 360 we will have a lot less noise 
> about the overflow but resource need will increase substantially.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26415) Mark StreamSinkProvider and StreamSourceProvider as stable

2018-12-19 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725376#comment-16725376
 ] 

Apache Spark commented on SPARK-26415:
--

User 'granthenke' has created a pull request for this issue:
https://github.com/apache/spark/pull/23354

> Mark StreamSinkProvider and StreamSourceProvider as stable
> --
>
> Key: SPARK-26415
> URL: https://issues.apache.org/jira/browse/SPARK-26415
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Grant Henke
>Priority: Major
>
> This change marks the StreamSinkProvider and StreamSourceProvider
> interfaces as stable so that it can be relied on for compatibility for all of
> Spark 2.x.
> These interfaces have been available since Spark 2.0.0 and unchanged
> since Spark 2.1.0. Additionally the Kafka integration has been using it
> since Spark 2.1.0.
> Because structured streaming general availability was announced in
> Spark 2.2.0, I suspect there are other third-party integrations using it
> already as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26415) Mark StreamSinkProvider and StreamSourceProvider as stable

2018-12-19 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26415:


Assignee: Apache Spark

> Mark StreamSinkProvider and StreamSourceProvider as stable
> --
>
> Key: SPARK-26415
> URL: https://issues.apache.org/jira/browse/SPARK-26415
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Grant Henke
>Assignee: Apache Spark
>Priority: Major
>
> This change marks the StreamSinkProvider and StreamSourceProvider
> interfaces as stable so that it can be relied on for compatibility for all of
> Spark 2.x.
> These interfaces have been available since Spark 2.0.0 and unchanged
> since Spark 2.1.0. Additionally the Kafka integration has been using it
> since Spark 2.1.0.
> Because structured streaming general availability was announced in
> Spark 2.2.0, I suspect there are other third-party integrations using it
> already as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26415) Mark StreamSinkProvider and StreamSourceProvider as stable

2018-12-19 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26415:


Assignee: (was: Apache Spark)

> Mark StreamSinkProvider and StreamSourceProvider as stable
> --
>
> Key: SPARK-26415
> URL: https://issues.apache.org/jira/browse/SPARK-26415
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Grant Henke
>Priority: Major
>
> This change marks the StreamSinkProvider and StreamSourceProvider
> interfaces as stable so that it can be relied on for compatibility for all of
> Spark 2.x.
> These interfaces have been available since Spark 2.0.0 and unchanged
> since Spark 2.1.0. Additionally the Kafka integration has been using it
> since Spark 2.1.0.
> Because structured streaming general availability was announced in
> Spark 2.2.0, I suspect there are other third-party integrations using it
> already as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26415) Mark StreamSinkProvider and StreamSourceProvider as stable

2018-12-19 Thread Grant Henke (JIRA)
Grant Henke created SPARK-26415:
---

 Summary: Mark StreamSinkProvider and StreamSourceProvider as stable
 Key: SPARK-26415
 URL: https://issues.apache.org/jira/browse/SPARK-26415
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.2.0
Reporter: Grant Henke


This change marks the StreamSinkProvider and StreamSourceProvider
interfaces as stable so that it can be relied on for compatibility for all of
Spark 2.x.

These interfaces have been available since Spark 2.0.0 and unchanged
since Spark 2.1.0. Additionally the Kafka integration has been using it
since Spark 2.1.0.

Because structured streaming general availability was announced in
Spark 2.2.0, I suspect there are other third-party integrations using it
already as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26414) Race between SparkContext and YARN AM can cause NPE in UI setup code

2018-12-19 Thread Marcelo Vanzin (JIRA)
Marcelo Vanzin created SPARK-26414:
--

 Summary: Race between SparkContext and YARN AM can cause NPE in UI 
setup code
 Key: SPARK-26414
 URL: https://issues.apache.org/jira/browse/SPARK-26414
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 2.4.0
Reporter: Marcelo Vanzin


There's a super narrow race between the SparkContext and the AM startup code:

- SC starts the AM and waits for it to go into running state
- AM goes into running state, unblocking SC
- AM sends AmIpFilter config to SC, adds the filter to the list and then the 
filter configs
- unblocked SC is in the middle of setting up the UI and sees only the filter, 
but not the configs

Then you get this:

{noformat}
ERROR org.apache.spark.SparkContext  - Error initializing SparkContext.
java.lang.NullPointerException
at 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.init(AmIpFilter.java:81)
at 
org.spark_project.jetty.servlet.FilterHolder.initialize(FilterHolder.java:139)
at 
org.spark_project.jetty.servlet.ServletHandler.initialize(ServletHandler.java:881)
at 
org.spark_project.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:349)
at 
org.spark_project.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:778)
at 
org.spark_project.jetty.servlet.ServletContextHandler.doStart(ServletContextHandler.java:262)
at 
org.spark_project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.apache.spark.ui.ServerInfo.addHandler(JettyUtils.scala:520)
at 
org.apache.spark.ui.WebUI$$anonfun$attachHandler$1.apply(WebUI.scala:96)
at 
org.apache.spark.ui.WebUI$$anonfun$attachHandler$1.apply(WebUI.scala:96)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.ui.WebUI.attachHandler(WebUI.scala:96)
at 
org.apache.spark.SparkContext$$anonfun$22$$anonfun$apply$8.apply(SparkContext.scala:522)
at 
org.apache.spark.SparkContext$$anonfun$22$$anonfun$apply$8.apply(SparkContext.scala:522)
at scala.Option.foreach(Option.scala:257)
{noformat}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21544) Test jar of some module should not install or deploy twice

2018-12-19 Thread Matt Foley (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725257#comment-16725257
 ] 

Matt Foley commented on SPARK-21544:


[~cane] , I'm having exactly the problem described in SPARK-21544 with regard 
to spark-streaming , in Spark 2.4.0.  The changes in PR 
[#18745|https://github.com/apache/spark/pull/18745] fixed the problem with 
sql/catalyst and sql/core, but did not change streaming. Did you find a 
solution for it? Any help would be greatly appreciated.

> Test jar of some module should not install or deploy twice
> --
>
> Key: SPARK-21544
> URL: https://issues.apache.org/jira/browse/SPARK-21544
> Project: Spark
>  Issue Type: Improvement
>  Components: Deploy
>Affects Versions: 1.6.1, 2.1.0
>Reporter: zhoukang
>Assignee: zhoukang
>Priority: Minor
> Fix For: 2.3.0
>
>
> For moudle below:
> common/network-common
> streaming
> sql/core
> sql/catalyst
> tests.jar will install or deploy twice.Like:
> {code:java}
> [INFO] Installing 
> /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar
>  to 
> /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar
> [DEBUG] Writing tracking file 
> /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/_remote.repositories
> [DEBUG] Installing 
> org.apache.spark:spark-streaming_2.11:2.1.0-mdh2.1.0.1-SNAPSHOT/maven-metadata.xml
>  to 
> /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/maven-metadata-local.xml
> [DEBUG] Installing org.apache.spark:spark-streaming_2.11/maven-metadata.xml 
> to 
> /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/maven-metadata-local.xml
> [INFO] Installing 
> /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar
>  to 
> /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar
> [DEBUG] Skipped re-installing 
> /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar
>  to 
> /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar,
>  seems unchanged
> {code}
> The reason is below:
> {code:java}
> [DEBUG]   (f) artifact = 
> org.apache.spark:spark-streaming_2.11:jar:2.1.0-mdh2.1.0.1-SNAPSHOT
> [DEBUG]   (f) attachedArtifacts = 
> [org.apache.spark:spark-streaming_2.11:test-jar:tests:2.1.0-mdh2.1.0.1-SNAPSHOT,
>  org.apache.spark:spark-streaming_2.11:jar:tests:2.1.0-mdh2.1.0.1-SNAPSHOT, 
> org.apache.spark:spark
> -streaming_2.11:java-source:sources:2.1.0-mdh2.1.0.1-SNAPSHOT, 
> org.apache.spark:spark-streaming_2.11:java-source:test-sources:2.1.0-mdh2.1.0.1-SNAPSHOT,
>  org.apache.spark:spark-streaming_2.11:javadoc:javadoc:2.1.0
> -mdh2.1.0.1-SNAPSHOT]
> {code}
> when executing 'mvn deploy' to nexus during release.I will fail since release 
> nexus can not be override.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26413) SPIP: RDD Arrow Support in Spark Core and PySpark

2018-12-19 Thread Richard Whitcomb (JIRA)
Richard Whitcomb created SPARK-26413:


 Summary: SPIP: RDD Arrow Support in Spark Core and PySpark
 Key: SPARK-26413
 URL: https://issues.apache.org/jira/browse/SPARK-26413
 Project: Spark
  Issue Type: Improvement
  Components: PySpark, Spark Core
Affects Versions: 3.0.0
Reporter: Richard Whitcomb


h2. Background and Motivation

Arrow is becoming an standard interchange format for columnar Structured Data.  
This is already true in Spark with the use of arrow in the pandas udf functions 
in the dataframe API.

However the current implementation of arrow in spark is limited to two use 
cases.
 * Pandas UDF that allows for operations on one or more columns in the 
DataFrame API.
 * Collect as Pandas which pulls back the entire dataset to the driver in a 
Pandas Dataframe.

What is still hard however is making use of all of the columns in a Dataframe 
while staying distributed across the workers.  The only way to do this 
currently is to drop down into RDDs and collect the rows into a dataframe. 
However pickling is very slow and the collecting is expensive.

The proposal is to extend spark in a way that allows users to operate on an 
Arrow Table fully while still making use of Spark's underlying technology.  
Some examples of possibilities with this new API. 
 * Pass the Arrow Table with Zero Copy to PyTorch for predictions.
 * Pass to Nvidia Rapids for an algorithm to be run on the GPU.
 * Distribute data across many GPUs making use of the new Barriers API.

h2. Targets users and personas

ML, Data Scientists, and future library authors..
h2. Goals
 * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
 * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark 
Dataframe
 * Open the possibilities to tighter integration between Arrow/Pandas/Spark 
especially at a library level.

h2. Non-Goals
 * Not creating a new API but instead using existing APIs.

h2. Proposed API changes
h3. Data Objects

case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
h3. Dataset.scala
{code:java}
// Converts a Dataset to an RDD of Arrow Tables
// Each RDD row is an Interable of Arrow Batches.
def arrowRDD: RDD[ArrowTable]
 
// Utility Function to convert to RDD Arrow Table for PySpark
private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]]
{code}
h3. RDD.scala
{code:java}
 // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema
 def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe
  
 // Converts RDD[ArrowTable] to an RDD of Rows
 def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code}
h3. Serializers.py
{code:java}
# Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table.
class ArrowSerializer(FramedSerializer)
{code}
h3. RDD.py
{code}
# New RDD Class that has an RDD[ArrowTable] behind it and uses the new 
ArrowSerializer instead of the normal Pickle Serializer
class ArrowRDD(RDD){code}
 
h3. Dataframe.py
{code}
// New Function that converts a pyspark dataframe into an ArrowRDD
def arrow(self):
{code}
 
h2. Example API Usage
h3. Pyspark
{code}
# Select a Single Column Using Pandas
def map_table(arrow_table):
  import pyarrow as pa
  pdf = arrow_table.to_pandas()
  pdf = pdf[['email']]
  return pa.Table.from_pandas(pdf)
# Convert to Arrow RDD, map over tables, convert back to dataframe
df.arrow.map(map_table).dataframe 
{code}
h3. Scala

 
{code:java}
// Find N Centroids using Cuda Rapids kMeans
def runCuKmeans(table: ArrowTable, clusters: Int): ArrowTable
 
// Convert Dataset[Row] to RDD[ArrowTable] and back to Dataset[Row]
df.arrowRDD.map(table => runCuKmeans(table, N)).arrowToDataframe.show(10)
{code}
 
h2. Implementation Details

As mentioned in the first section, the goal is to make it easier for Spark 
users to interact with Arrow tools and libraries.  This however does come with 
some considerations from a Spark perspective.

 Arrow is column based instead of Row based.  In the above API proposal of 
RDD[ArrowTable] each RDD row will in fact be a block of data.  Another proposal 
in this regard is to introduce a new parameter to Spark called 
arrow.sql.execution.arrow.maxRecordsPerTable.  The goal of this parameter is to 
decide how many records are included in a single Arrow Table.  If set to -1 the 
entire partition will be included in the table else to that number. Within that 
number the normal blocking mechanisms of Arrow is used to include multiple 
batches.  This is still dictated by arrowMaxRecordsPerBatch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26402) Canonicalization on GetStructField

2018-12-19 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26402:


Assignee: DB Tsai  (was: Apache Spark)

> Canonicalization on GetStructField
> --
>
> Key: SPARK-26402
> URL: https://issues.apache.org/jira/browse/SPARK-26402
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: DB Tsai
>Assignee: DB Tsai
>Priority: Major
>
> GetStructField with different optional names should be semantically equal. We 
> will use this as building block to compare the nested fields used in the 
> plans to be optimized by catalyst optimizer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26402) Canonicalization on GetStructField

2018-12-19 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26402:


Assignee: Apache Spark  (was: DB Tsai)

> Canonicalization on GetStructField
> --
>
> Key: SPARK-26402
> URL: https://issues.apache.org/jira/browse/SPARK-26402
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: DB Tsai
>Assignee: Apache Spark
>Priority: Major
>
> GetStructField with different optional names should be semantically equal. We 
> will use this as building block to compare the nested fields used in the 
> plans to be optimized by catalyst optimizer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26402) Canonicalization on GetStructField

2018-12-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725256#comment-16725256
 ] 

ASF GitHub Bot commented on SPARK-26402:


dbtsai opened a new pull request #23353: [SPARK-26402][SQL] Canonicalization on 
GetStructField 
URL: https://github.com/apache/spark/pull/23353
 
 
   ## What changes were proposed in this pull request?
   
   GetStructField with different optional names should be semantically equal. 
We will use this as building block to compare the nested fields used in the 
plans to be optimized by catalyst optimizer.
   
   ## How was this patch tested?
   
   New tests are added.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Canonicalization on GetStructField
> --
>
> Key: SPARK-26402
> URL: https://issues.apache.org/jira/browse/SPARK-26402
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: DB Tsai
>Assignee: DB Tsai
>Priority: Major
>
> GetStructField with different optional names should be semantically equal. We 
> will use this as building block to compare the nested fields used in the 
> plans to be optimized by catalyst optimizer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26388) No support for "alter table .. replace columns" to drop columns

2018-12-19 Thread nirav patel (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725244#comment-16725244
 ] 

nirav patel commented on SPARK-26388:
-

updated description with requested detail and a sample.

> No support for "alter table .. replace columns" to drop columns
> ---
>
> Key: SPARK-26388
> URL: https://issues.apache.org/jira/browse/SPARK-26388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1, 2.3.1, 2.3.2
>Reporter: nirav patel
>Priority: Major
>
> Looks like hive {{replace columns}} is not working with spark 2.2.1 and 2.3.1
>   
> create table myschema.mytable(a int, b int, c int)
> alter table myschema.mytable replace columns (a int,b int,d int)
>  
> *Expected Behavior*
> it should drop column c and add column d.
> alter table... replace columns.. should work just as it works in hive.
> It replaces existing columns with new ones. It delete if column is not 
> mentioned.
>  
> here's the snippet of hive cli:
> hive> desc mytable;
> OK
> a                   int                                     
> b                   int                                     
> c                   int                                     
> Time taken: 0.05 seconds, Fetched: 3 row(s)
> hive> alter table mytable replace columns(a int, b int, d int);
> OK
> Time taken: 0.078 seconds
> hive> desc mytable;
> OK
> a                   int                                     
> b                   int                                     
> d                   int                                     
> Time taken: 0.03 seconds, Fetched: 3 row(s)
>  
> *Actual Result*
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: 
> alter table replace columns
>  {{ADD COLUMNS}} works which seemed to previously reported and fixed as well:
> https://issues.apache.org/jira/browse/SPARK-18893
>  
> Replace columns should be supported as well. afaik, that's the only way to 
> delete hive columns.
>   
>   
>  It supposed to work according to this docs:
>  
> [https://docs.databricks.com/spark/latest/spark-sql/language-manual/alter-table-or-view.html#replace-columns]
>  
> [https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#supported-hive-features]
>   
>  but it's throwing error for me on 2 different versions.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames for the entire partition

2018-12-19 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-26412:

Target Version/s: 3.0.0

> Allow Pandas UDF to take an iterator of pd.DataFrames for the entire partition
> --
>
> Key: SPARK-26412
> URL: https://issues.apache.org/jira/browse/SPARK-26412
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Priority: Major
>
> Pandas UDF is the ideal connection between PySpark and DL model inference 
> workload. However, user needs to load the model file first to make 
> predictions. It is common to see models of size ~100MB or bigger. If the 
> Pandas UDF execution is limited to batch scope, user need to repeatedly load 
> the same model for every batch in the same python worker process, which is 
> inefficient. I created this JIRA to discuss possible solutions.
> Essentially we need to support "start()" and "finish()" besides "apply". We 
> can either provide those interfaces or simply provide users the iterator of 
> batches in pd.DataFrame and let user code handle it.
> cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26388) No support for "alter table .. replace columns" to drop columns

2018-12-19 Thread nirav patel (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

nirav patel updated SPARK-26388:

Description: 
Looks like hive {{replace columns}} is not working with spark 2.2.1 and 2.3.1
  

create table myschema.mytable(a int, b int, c int)

alter table myschema.mytable replace columns (a int,b int,d int)

 

*Expected Behavior*

it should drop column c and add column d.

alter table... replace columns.. should work just as it works in hive.

It replaces existing columns with new ones. It delete if column is not 
mentioned.

 

here's the snippet of hive cli:

hive> desc mytable;

OK

a                   int                                     

b                   int                                     

c                   int                                     

Time taken: 0.05 seconds, Fetched: 3 row(s)

hive> alter table mytable replace columns(a int, b int, d int);

OK

Time taken: 0.078 seconds

hive> desc mytable;

OK

a                   int                                     

b                   int                                     

d                   int                                     

Time taken: 0.03 seconds, Fetched: 3 row(s)

 

*Actual Result*

Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
Operation not allowed: alter table replace columns


 {{ADD COLUMNS}} works which seemed to previously reported and fixed as well:

https://issues.apache.org/jira/browse/SPARK-18893

 

Replace columns should be supported as well. afaik, that's the only way to 
delete hive columns.
  
  
 It supposed to work according to this docs:
 
[https://docs.databricks.com/spark/latest/spark-sql/language-manual/alter-table-or-view.html#replace-columns]
 
[https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#supported-hive-features]
  
 but it's throwing error for me on 2 different versions.

 

 

  was:
Looks like hive {{replace columns}} is not working with spark 2.2.1 and 2.3.1
 
{{alterSchemaSql : alter table myschema.mytable replace columns (a int,b int,d 
int) Exception in thread "main" 
org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: 
alter table replace columns(line 2, pos 6) }}
{{ADD COLUMNS}} works which seemed to previously reported and fixed as well:

https://issues.apache.org/jira/browse/SPARK-18893

 

Replace columns should be supported as well. afaik, that's the only way to 
delete hive columns.
 
 
It supposed to work according to this docs:
[https://docs.databricks.com/spark/latest/spark-sql/language-manual/alter-table-or-view.html#replace-columns]
[https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#supported-hive-features]
 
but it's throwing error for me on 2 different versions.


> No support for "alter table .. replace columns" to drop columns
> ---
>
> Key: SPARK-26388
> URL: https://issues.apache.org/jira/browse/SPARK-26388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1, 2.3.1, 2.3.2
>Reporter: nirav patel
>Priority: Major
>
> Looks like hive {{replace columns}} is not working with spark 2.2.1 and 2.3.1
>   
> create table myschema.mytable(a int, b int, c int)
> alter table myschema.mytable replace columns (a int,b int,d int)
>  
> *Expected Behavior*
> it should drop column c and add column d.
> alter table... replace columns.. should work just as it works in hive.
> It replaces existing columns with new ones. It delete if column is not 
> mentioned.
>  
> here's the snippet of hive cli:
> hive> desc mytable;
> OK
> a                   int                                     
> b                   int                                     
> c                   int                                     
> Time taken: 0.05 seconds, Fetched: 3 row(s)
> hive> alter table mytable replace columns(a int, b int, d int);
> OK
> Time taken: 0.078 seconds
> hive> desc mytable;
> OK
> a                   int                                     
> b                   int                                     
> d                   int                                     
> Time taken: 0.03 seconds, Fetched: 3 row(s)
>  
> *Actual Result*
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: 
> alter table replace columns
>  {{ADD COLUMNS}} works which seemed to previously reported and fixed as well:
> https://issues.apache.org/jira/browse/SPARK-18893
>  
> Replace columns should be supported as well. afaik, that's the only way to 
> delete hive columns.
>   
>   
>  It supposed to work according to this docs:
>  
> [https://docs.databricks.com/spark/latest/spark-sql/language-manual/alter-table-or-view.html#replace-columns]
>  
> 

[jira] [Created] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames for the entire partition

2018-12-19 Thread Xiangrui Meng (JIRA)
Xiangrui Meng created SPARK-26412:
-

 Summary: Allow Pandas UDF to take an iterator of pd.DataFrames for 
the entire partition
 Key: SPARK-26412
 URL: https://issues.apache.org/jira/browse/SPARK-26412
 Project: Spark
  Issue Type: New Feature
  Components: PySpark
Affects Versions: 3.0.0
Reporter: Xiangrui Meng


Pandas UDF is the ideal connection between PySpark and DL model inference 
workload. However, user needs to load the model file first to make predictions. 
It is common to see models of size ~100MB or bigger. If the Pandas UDF 
execution is limited to batch scope, user need to repeatedly load the same 
model for every batch in the same python worker process, which is inefficient. 
I created this JIRA to discuss possible solutions.

Essentially we need to support "start()" and "finish()" besides "apply". We can 
either provide those interfaces or simply provide users the iterator of batches 
in pd.DataFrame and let user code handle it.

cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26410) Support per Pandas UDF configuration

2018-12-19 Thread Xiangrui Meng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangrui Meng updated SPARK-26410:
--
Description: 
We use a "maxRecordsPerBatch" conf to control the batch sizes. However, the 
"right" batch size usually depends on the task itself. It would be nice if user 
can configure the batch size when they declare the Pandas UDF.

This is orthogonal to SPARK-23258 (using max buffer size instead of row count).

Besides API, we should also discuss how to merge Pandas UDFs of different 
configurations. For example,

{code}
df.select(predict1(col("features"), predict2(col("features")))
{code}

when predict1 requests 100 rows per batch, while predict2 requests 120 rows per 
batch.

cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator]

  was:
We use a "maxRecordsPerBatch" conf to control the batch sizes. However, the 
"right" batch size usually depends on the task itself. It would be nice if user 
can configure the batch size when they declare the Pandas UDF.

This is orthogonal to SPARK-23258 (using max buffer size instead of row count).

Besides API, we should also discuss how to merge Pandas UDFs of different 
configurations. For example,

{code}
df.select(predict1(col("features"), predict2(col("features")))
{code}

when predict1 requests 100 rows per batch, while predict2 requests 120 rows per 
batch.

cc: [~icexelloss] [~bryanc] [~holdenk] [~ueshin] [~smilegator]


> Support per Pandas UDF configuration
> 
>
> Key: SPARK-26410
> URL: https://issues.apache.org/jira/browse/SPARK-26410
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Priority: Major
>
> We use a "maxRecordsPerBatch" conf to control the batch sizes. However, the 
> "right" batch size usually depends on the task itself. It would be nice if 
> user can configure the batch size when they declare the Pandas UDF.
> This is orthogonal to SPARK-23258 (using max buffer size instead of row 
> count).
> Besides API, we should also discuss how to merge Pandas UDFs of different 
> configurations. For example,
> {code}
> df.select(predict1(col("features"), predict2(col("features")))
> {code}
> when predict1 requests 100 rows per batch, while predict2 requests 120 rows 
> per batch.
> cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26409) SQLConf should be serializable in test sessions

2018-12-19 Thread Gengliang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang updated SPARK-26409:
---
Description: 
`SQLConf` is supposed to be serializable. However, currently it is not  
serializable in `WithTestConf`. `WithTestConf` uses the method `overrideConfs` 
in closure, while the classes which implements it 
(`TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder`) are not 
serializable.

Use a local variable to fix it.

  was:
Currently the `SQLConf` in `WithTestConf` is not serializable. It uses the 
method `overrideConfs` in closure, while the class 
`TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder` are not 
serializable.

This PR is to use a local variable to fix it.



> SQLConf should be serializable in test sessions
> ---
>
> Key: SPARK-26409
> URL: https://issues.apache.org/jira/browse/SPARK-26409
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Tests
>Affects Versions: 3.0.0
>Reporter: Gengliang Wang
>Priority: Major
>
> `SQLConf` is supposed to be serializable. However, currently it is not  
> serializable in `WithTestConf`. `WithTestConf` uses the method 
> `overrideConfs` in closure, while the classes which implements it 
> (`TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder`) are not 
> serializable.
> Use a local variable to fix it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26411) Streaming: _spark_metadata and checkpoints out of sync cause checkpoint packing failure

2018-12-19 Thread Alexander Panzhin (JIRA)
Alexander Panzhin created SPARK-26411:
-

 Summary: Streaming: _spark_metadata and checkpoints out of sync 
cause checkpoint packing failure
 Key: SPARK-26411
 URL: https://issues.apache.org/jira/browse/SPARK-26411
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.3.0
Reporter: Alexander Panzhin


Spark Structured Streaming File source to File sink seems to be picking up 
information from `_spark_metadata` directory for checkpoint data compaction

Worst part is that output and checkpoint being out of sync, data is not being 
written.

*This is not documented anywhere. Removing checkpoint data and leaving 
_spark_metadata in the output directory WILL CAUSE data loss.*

 

FileSourceScanExec.createNonBucketedReadRDD kicks off compaction and fails the 
whole job, because it expects deltas to be present.
But the delta files are never written because FileStreamSink.addBatch doesn't 
execute the Dataframe that it receives.


{code:java}
...
INFO  [2018-12-17 03:20:02,784] 
org.apache.spark.sql.execution.streaming.FileStreamSink: Skipping already 
committed batch 75 
...
INFO [2018-12-17 03:30:01,691] 
org.apache.spark.sql.execution.streaming.FileStreamSource: Log offset set to 76 
with 29 new files INFO [2018-12-17 03:30:01,700] 
org.apache.spark.sql.execution.streaming.MicroBatchExecution: Committed offsets 
for batch 76. Metadata 
OffsetSeqMetadata(0,1545017401691,Map(spark.sql.shuffle.partitions -> 200, 
spark.sql.streaming.stateStore.providerClass -> 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider)) 
INFO [2018-12-17 03:30:01,704] 
org.apache.spark.sql.execution.streaming.FileStreamSource: Processing 29 files 
from 76:76 INFO [2018-12-17 03:30:01,983] 
org.apache.spark.sql.execution.datasources.FileSourceStrategy: Pruning 
directories with: INFO [2018-12-17 03:30:01,983] 
org.apache.spark.sql.execution.datasources.FileSourceStrategy: Post-Scan 
Filters: INFO [2018-12-17 03:30:01,984] 
org.apache.spark.sql.execution.datasources.FileSourceStrategy: Output Data 
Schema: struct INFO [2018-12-17 03:30:01,984] 
org.apache.spark.sql.execution.FileSourceScanExec: Pushed Filters: INFO 
[2018-12-17 03:30:02,581] 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Code generated 
in 16.205011 ms INFO [2018-12-17 03:30:02,593] 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Code generated 
in 9.368244 ms INFO [2018-12-17 03:30:02,629] 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Code generated 
in 31.126375 ms INFO [2018-12-17 03:30:02,640] org.apache.spark.SparkContext: 
Created broadcast 86 from start at SourceStream.scala:55 INFO [2018-12-17 
03:30:02,643] org.apache.spark.sql.execution.FileSourceScanExec: Planning scan 
with bin packing, max size: 14172786 bytes, open cost is considered as scanning 
4194304 bytes. INFO [2018-12-17 03:30:02,700] org.apache.spark.ContextCleaner: 
Cleaned accumulator 4321 INFO [2018-12-17 03:30:02,700] 
org.apache.spark.ContextCleaner: Cleaned accumulator 4326 INFO [2018-12-17 
03:30:02,700] org.apache.spark.ContextCleaner: Cleaned accumulator 4324 INFO 
[2018-12-17 03:30:02,700] org.apache.spark.ContextCleaner: Cleaned accumulator 
4320 INFO [2018-12-17 03:30:02,700] org.apache.spark.ContextCleaner: Cleaned 
accumulator 4325 INFO [2018-12-17 03:30:02,737] org.apache.spark.SparkContext: 
Created broadcast 87 from start at SourceStream.scala:55 INFO [2018-12-17 
03:30:02,756] org.apache.spark.SparkContext: Starting job: start at 
SourceStream.scala:55 INFO [2018-12-17 03:30:02,761] 
org.apache.spark.SparkContext: Created broadcast 88 from broadcast at 
DAGScheduler.scala:1079 INFO [2018-12-17 03:30:03,860] 
org.apache.spark.ExecutorAllocationManager: Requesting 3 new executors because 
tasks are backlogged (new desired total will be 3) INFO [2018-12-17 
03:30:04,863] org.apache.spark.ExecutorAllocationManager: Requesting 1 new 
executor because tasks are backlogged (new desired total will be 4) INFO 
[2018-12-17 03:30:06,545] org.apache.spark.SparkContext: Created broadcast 89 
from broadcast at DAGScheduler.scala:1079 WARN [2018-12-17 03:30:07,214] 
org.apache.spark.scheduler.TaskSetManager: Lost task 19.0 in stage 87.0 (TID 
6145, ip-10-172-18-94.ec2.internal, executor 1): 
java.lang.IllegalStateException: Error reading delta file 
hdfs://ip-10-172-19-174.ec2.internal:8020/user/hadoop/clickstream/checkpoint/state/0/19/1.delta
 of HDFSStateStoreProvider[id = (op=0,part=19),dir = 
hdfs://ip-10-172-19-174.ec2.internal:8020/user/hadoop/clickstream/checkpoint/state/0/19]:
 
hdfs://ip-10-172-19-174.ec2.internal:8020/user/hadoop/clickstream/checkpoint/state/0/19/1.delta
 does not exist at 

[jira] [Commented] (SPARK-26390) ColumnPruning rule should only do column pruning

2018-12-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725216#comment-16725216
 ] 

ASF GitHub Bot commented on SPARK-26390:


asfgit closed pull request #23343: [SPARK-26390][SQL] ColumnPruning rule should 
only do column pruning
URL: https://github.com/apache/spark/pull/23343
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 3eb6bca6ec976..44d5543114902 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -93,7 +93,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
 RewriteCorrelatedScalarSubquery,
 EliminateSerialization,
 RemoveRedundantAliases,
-RemoveRedundantProject,
+RemoveNoopOperators,
 SimplifyExtractValueOps,
 CombineConcats) ++
 extendedOperatorOptimizationRules
@@ -177,7 +177,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
   RewritePredicateSubquery,
   ColumnPruning,
   CollapseProject,
-  RemoveRedundantProject) :+
+  RemoveNoopOperators) :+
 Batch("UpdateAttributeReferences", Once,
   UpdateNullabilityInAttributeReferences)
   }
@@ -403,11 +403,15 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
 }
 
 /**
- * Remove projections from the query plan that do not make any modifications.
+ * Remove no-op operators from the query plan that do not make any 
modifications.
  */
-object RemoveRedundantProject extends Rule[LogicalPlan] {
+object RemoveNoopOperators extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-case p @ Project(_, child) if p.output == child.output => child
+// Eliminate no-op Projects
+case p @ Project(_, child) if child.sameOutput(p) => child
+
+// Eliminate no-op Window
+case w: Window if w.windowExpressions.isEmpty => w.child
   }
 }
 
@@ -602,17 +606,12 @@ object ColumnPruning extends Rule[LogicalPlan] {
   p.copy(child = w.copy(
 windowExpressions = w.windowExpressions.filter(p.references.contains)))
 
-// Eliminate no-op Window
-case w: Window if w.windowExpressions.isEmpty => w.child
-
-// Eliminate no-op Projects
-case p @ Project(_, child) if child.sameOutput(p) => child
-
 // Can't prune the columns on LeafNode
 case p @ Project(_, _: LeafNode) => p
 
 // for all other logical plans that inherits the output from it's children
-case p @ Project(_, child) =>
+// Project over project is handled by the first case, skip it here.
+case p @ Project(_, child) if !child.isInstanceOf[Project] =>
   val required = child.references ++ p.references
   if (!child.inputSet.subsetOf(required)) {
 val newChildren = child.children.map(c => prunedChild(c, required))
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
index 8d7c9bf220bc2..57195d5fda7c5 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
@@ -34,6 +34,7 @@ class ColumnPruningSuite extends PlanTest {
 val batches = Batch("Column pruning", FixedPoint(100),
   PushDownPredicate,
   ColumnPruning,
+  RemoveNoopOperators,
   CollapseProject) :: Nil
   }
 
@@ -340,10 +341,8 @@ class ColumnPruningSuite extends PlanTest {
   test("Column pruning on Union") {
 val input1 = LocalRelation('a.int, 'b.string, 'c.double)
 val input2 = LocalRelation('c.int, 'd.string, 'e.double)
-val query = Project('b :: Nil,
-  Union(input1 :: input2 :: Nil)).analyze
-val expected = Project('b :: Nil,
-  Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: 
Nil)).analyze
+val query = Project('b :: Nil, Union(input1 :: input2 :: Nil)).analyze
+val expected = Union(Project('b :: Nil, input1) :: Project('d :: Nil, 
input2) :: Nil).analyze
 comparePlans(Optimize.execute(query), expected)
   }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
index ef4b848924f06..b190dd5a7c220 100644
--- 

[jira] [Resolved] (SPARK-26390) ColumnPruning rule should only do column pruning

2018-12-19 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-26390.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

This is resolved via https://github.com/apache/spark/pull/23343

> ColumnPruning rule should only do column pruning
> 
>
> Key: SPARK-26390
> URL: https://issues.apache.org/jira/browse/SPARK-26390
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26410) Support per Pandas UDF configuration

2018-12-19 Thread Xiangrui Meng (JIRA)
Xiangrui Meng created SPARK-26410:
-

 Summary: Support per Pandas UDF configuration
 Key: SPARK-26410
 URL: https://issues.apache.org/jira/browse/SPARK-26410
 Project: Spark
  Issue Type: New Feature
  Components: PySpark
Affects Versions: 3.0.0
Reporter: Xiangrui Meng


We use a "maxRecordsPerBatch" conf to control the batch sizes. However, the 
"right" batch size usually depends on the task itself. It would be nice if user 
can configure the batch size when they declare the Pandas UDF.

This is orthogonal to SPARK-23258 (using max buffer size instead of row count).

Besides API, we should also discuss how to merge Pandas UDFs of different 
configurations. For example,

{code}
df.select(predict1(col("features"), predict2(col("features")))
{code}

when predict1 requests 100 rows per batch, while predict2 requests 120 rows per 
batch.

cc: [~icexelloss] [~bryanc] [~holdenk] [~ueshin] [~smilegator]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26409) SQLConf should be serializable in test sessions

2018-12-19 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26409:


Assignee: Apache Spark

> SQLConf should be serializable in test sessions
> ---
>
> Key: SPARK-26409
> URL: https://issues.apache.org/jira/browse/SPARK-26409
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Tests
>Affects Versions: 3.0.0
>Reporter: Gengliang Wang
>Assignee: Apache Spark
>Priority: Major
>
> Currently the `SQLConf` in `WithTestConf` is not serializable. It uses the 
> method `overrideConfs` in closure, while the class 
> `TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder` are not 
> serializable.
> This PR is to use a local variable to fix it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26409) SQLConf should be serializable in test sessions

2018-12-19 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26409:


Assignee: (was: Apache Spark)

> SQLConf should be serializable in test sessions
> ---
>
> Key: SPARK-26409
> URL: https://issues.apache.org/jira/browse/SPARK-26409
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Tests
>Affects Versions: 3.0.0
>Reporter: Gengliang Wang
>Priority: Major
>
> Currently the `SQLConf` in `WithTestConf` is not serializable. It uses the 
> method `overrideConfs` in closure, while the class 
> `TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder` are not 
> serializable.
> This PR is to use a local variable to fix it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26409) SQLConf should be serializable in test sessions

2018-12-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725190#comment-16725190
 ] 

ASF GitHub Bot commented on SPARK-26409:


gengliangwang opened a new pull request #23352: [SPARK-26409][SQL][TESTS] 
SQLConf should be serializable in test sessions
URL: https://github.com/apache/spark/pull/23352
 
 
   ## What changes were proposed in this pull request?
   
   Currently the `SQLConf` in `WithTestConf` is not serializable. It uses the 
method `overrideConfs` in closure, while the class 
`TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder` are not 
serializable.
   
   This PR is to use a local variable to fix it.
   
   ## How was this patch tested?
   
   Add unit test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SQLConf should be serializable in test sessions
> ---
>
> Key: SPARK-26409
> URL: https://issues.apache.org/jira/browse/SPARK-26409
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Tests
>Affects Versions: 3.0.0
>Reporter: Gengliang Wang
>Priority: Major
>
> Currently the `SQLConf` in `WithTestConf` is not serializable. It uses the 
> method `overrideConfs` in closure, while the class 
> `TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder` are not 
> serializable.
> This PR is to use a local variable to fix it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26409) SQLConf should be serializable in test sessions

2018-12-19 Thread Gengliang Wang (JIRA)
Gengliang Wang created SPARK-26409:
--

 Summary: SQLConf should be serializable in test sessions
 Key: SPARK-26409
 URL: https://issues.apache.org/jira/browse/SPARK-26409
 Project: Spark
  Issue Type: Improvement
  Components: SQL, Tests
Affects Versions: 3.0.0
Reporter: Gengliang Wang


Currently the `SQLConf` in `WithTestConf` is not serializable. It uses the 
method `overrideConfs` in closure, while the class 
`TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder` are not 
serializable.

This PR is to use a local variable to fix it.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26408) java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347)

2018-12-19 Thread Amit Siddhu (JIRA)
Amit Siddhu created SPARK-26408:
---

 Summary: java.util.NoSuchElementException: None.get at 
scala.None$.get(Option.scala:347)
 Key: SPARK-26408
 URL: https://issues.apache.org/jira/browse/SPARK-26408
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.2
 Environment: Spark version 2.3.2

Scala version 2.11.8

Hbase version 1.4.7
Reporter: Amit Siddhu


{code:java}
sudo spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 
--repositories http://repo.hortonworks.com/content/groups/public/
{code}
{code:java}
import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.{SparkConf, SparkContext}
import spark.sqlContext.implicits._
{code}
{code:java}
def withCatalog(cat: String): DataFrame = {
  spark.sqlContext
  .read
  .options(Map(HBaseTableCatalog.tableCatalog->cat))
  .format("org.apache.spark.sql.execution.datasources.hbase")
  .load()
}
{code}
{code:java}
def motorQuoteCatatog = s"""{ |"table":{"namespace":"default", 
"name":"public.motor_product_quote", "tableCoder":"PrimitiveType"}, 
|"rowkey":"id", |"columns":{ |"id":{"cf":"rowkey", "col":"id", 
"type":"string"}, |"quote_id":{"cf":"motor_product_quote", "col":"quote_id", 
"type":"string"}, |"vehicle_id":{"cf":"motor_product_quote", 
"col":"vehicle_id", "type":"bigint"}, |"is_new":{"cf":"motor_product_quote", 
"col":"is_new", "type":"boolean"}, 
|"date_of_manufacture":{"cf":"motor_product_quote", 
"col":"date_of_manufacture", "type":"string"}, 
|"raw_data":{"cf":"motor_product_quote", "col":"raw_data", "type":"string"}, 
|"is_processed":{"cf":"motor_product_quote", "col":"is_processed", 
"type":"boolean"}, |"created_on":{"cf":"motor_product_quote", 
"col":"created_on", "type":"string"}, |"type":{"cf":"motor_product_quote", 
"col":"type", "type":"string"}, |"requirement_id":{"cf":"motor_product_quote", 
"col":"requirement_id", "type":"int"}, 
|"previous_policy_id":{"cf":"motor_product_quote", "col":"type", 
"previous_policy_id":"int"}, |"parent_quote_id":{"cf":"motor_product_quote", 
"col":"type", "parent_quote_id":"int"}, 
|"ticket_id":{"cf":"motor_product_quote", "col":"type", "ticket_id":"int"}, 
|"tracker_id":{"cf":"motor_product_quote", "col":"tracker_id", "type":"int"}, 
|"category":{"cf":"motor_product_quote", "col":"category", "type":"string"}, 
|"sales_channel_id":{"cf":"motor_product_quote", "col":"sales_channel_id", 
"type":"int"}, |"policy_type":{"cf":"motor_product_quote", "col":"policy_type", 
"type":"string"}, |"original_quote_created_by_id":{"cf":"motor_product_quote", 
"col":"type", "original_quote_created_by_id":"int"}, 
|"created_by_id":{"cf":"motor_product_quote", "col":"created_by_id", 
"type":"int"}, |"mobile":{"cf":"motor_product_quote", "col":"mobile", 
"type":"string"}, |"registration_number":{"cf":"motor_product_quote", 
"col":"registration_number", "type":"string"} |} |}""".stripMargin
{code}
 
{code:java}
val df = withCatalog(motorQuoteCatatog){code}
{code:java}
java.util.NoSuchElementException: None.get
 at scala.None$.get(Option.scala:347)
 at scala.None$.get(Option.scala:345)
 at org.apache.spark.sql.execution.datasources.hbase.Field.   
(HBaseTableCatalog.scala:102)
 at  
org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$$anonfun$ap 
ply$3.apply(HBaseTableCatalog.scala:286)
 at 
org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$$anonfun$apply$3.apply(HBaseTableCatalog.scala:281)
at scala.collection.immutable.List.foreach(List.scala:381)
 at 
org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:281)
 at 
org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:80)
 at 
org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
 at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341)
 at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
 at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
 at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
 at withCatalog(:38)
 ... 55 elided
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26396) Kafka consumer cache overflow since 2.4.x

2018-12-19 Thread Kaspar Tint (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725104#comment-16725104
 ] 

Kaspar Tint commented on SPARK-26396:
-

So in case there are four groupid's all consuming same topic with 90 
partitions  360 for one JVM should be correct, right?

> Kafka consumer cache overflow since 2.4.x
> -
>
> Key: SPARK-26396
> URL: https://issues.apache.org/jira/browse/SPARK-26396
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark 2.4 standalone client mode
>Reporter: Kaspar Tint
>Priority: Major
>
> We are experiencing an issue where the Kafka consumer cache seems to overflow 
> constantly upon starting the application. This issue appeared after upgrading 
> to Spark 2.4.
> We would get constant warnings like this:
> {code:java}
> 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43)
> {code}
> This application is running 4 different Spark Structured Streaming queries 
> against the same Kafka topic that has 90 partitions. We used to run it with 
> just the default settings so it defaulted to cache size 64 on Spark 2.3 but 
> now we tried to put it to 180 or 360. With 360 we will have a lot less noise 
> about the overflow but resource need will increase substantially.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26216) Do not use case class as public API (ScalaUDF)

2018-12-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725099#comment-16725099
 ] 

ASF GitHub Bot commented on SPARK-26216:


cloud-fan opened a new pull request #23351: [SPARK-26216][SQL][followup] use 
abstract class instead of trait for UserDefinedFunction
URL: https://github.com/apache/spark/pull/23351
 
 
   ## What changes were proposed in this pull request?
   
   A followup of https://github.com/apache/spark/pull/23178 , to keep binary 
compability by using abstract class.
   
   ## How was this patch tested?
   
   Manual test. I created a simple app with Spark 2.4
   ```
   object TryUDF {
 def main(args: Array[String]): Unit = {
   val spark = 
SparkSession.builder().appName("test").master("local[*]").getOrCreate()
   import spark.implicits._
   val f1 = udf((i: Int) => i + 1)
   spark.range(10).select(f1($"id")).show()
   spark.stop()
 }
   }
   ```
   
   When I run it with current master, it fails with
   ```
   java.lang.IncompatibleClassChangeError: Found interface 
org.apache.spark.sql.expressions.UserDefinedFunction, but class was expected
   ```
   
   When I run it with this PR, it works
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Do not use case class as public API (ScalaUDF)
> --
>
> Key: SPARK-26216
> URL: https://issues.apache.org/jira/browse/SPARK-26216
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
>  Labels: release-notes
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26396) Kafka consumer cache overflow since 2.4.x

2018-12-19 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725063#comment-16725063
 ] 

Gabor Somogyi edited comment on SPARK-26396 at 12/19/18 2:48 PM:
-

Number of executors can be set with {{--num-executors parameter, so it can be 
set differently for PROD and DEV.}}

{{There is no specific formula. One cached consumer will be created and stored 
inside a JVM in cache per (topic + partition + group.id).

 


was (Author: gsomogyi):
Number of executors can be set with {{--num-executors parameter, so it can be 
set differently for PROD and DEV.}}

{{There is no specific formula. One cached consumer will be created and stored 
inside a JVM in cache per (topic + partition + }}group.id).

 

> Kafka consumer cache overflow since 2.4.x
> -
>
> Key: SPARK-26396
> URL: https://issues.apache.org/jira/browse/SPARK-26396
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark 2.4 standalone client mode
>Reporter: Kaspar Tint
>Priority: Major
>
> We are experiencing an issue where the Kafka consumer cache seems to overflow 
> constantly upon starting the application. This issue appeared after upgrading 
> to Spark 2.4.
> We would get constant warnings like this:
> {code:java}
> 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43)
> {code}
> This application is running 4 different Spark Structured Streaming queries 
> against the same Kafka topic that has 90 partitions. We used to run it with 
> just the default settings so it defaulted to cache size 64 on Spark 2.3 but 
> now we tried to put it to 180 or 360. With 360 we will have a lot less noise 
> about the overflow but resource need will increase substantially.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26396) Kafka consumer cache overflow since 2.4.x

2018-12-19 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725063#comment-16725063
 ] 

Gabor Somogyi edited comment on SPARK-26396 at 12/19/18 2:50 PM:
-

Number of executors can be set with --num-executors parameter, so it can be set 
differently for PROD and DEV.
There is no specific formula. One cached consumer will be created and stored 
inside a JVM cache per (topic + partition + group.id).

 


was (Author: gsomogyi):
Number of executors can be set with --num-executors parameter, so it can be set 
differently for PROD and DEV.
There is no specific formula. One cached consumer will be created and stored 
inside a JVM in cache per (topic + partition + group.id).

 

> Kafka consumer cache overflow since 2.4.x
> -
>
> Key: SPARK-26396
> URL: https://issues.apache.org/jira/browse/SPARK-26396
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark 2.4 standalone client mode
>Reporter: Kaspar Tint
>Priority: Major
>
> We are experiencing an issue where the Kafka consumer cache seems to overflow 
> constantly upon starting the application. This issue appeared after upgrading 
> to Spark 2.4.
> We would get constant warnings like this:
> {code:java}
> 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43)
> {code}
> This application is running 4 different Spark Structured Streaming queries 
> against the same Kafka topic that has 90 partitions. We used to run it with 
> just the default settings so it defaulted to cache size 64 on Spark 2.3 but 
> now we tried to put it to 180 or 360. With 360 we will have a lot less noise 
> about the overflow but resource need will increase substantially.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26396) Kafka consumer cache overflow since 2.4.x

2018-12-19 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725063#comment-16725063
 ] 

Gabor Somogyi edited comment on SPARK-26396 at 12/19/18 2:49 PM:
-

Number of executors can be set with --num-executors parameter, so it can be set 
differently for PROD and DEV.
There is no specific formula. One cached consumer will be created and stored 
inside a JVM in cache per (topic + partition + group.id).

 


was (Author: gsomogyi):
Number of executors can be set with {{--num-executors parameter, so it can be 
set differently for PROD and DEV.}}

{{There is no specific formula. One cached consumer will be created and stored 
inside a JVM in cache per (topic + partition + group.id).

 

> Kafka consumer cache overflow since 2.4.x
> -
>
> Key: SPARK-26396
> URL: https://issues.apache.org/jira/browse/SPARK-26396
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark 2.4 standalone client mode
>Reporter: Kaspar Tint
>Priority: Major
>
> We are experiencing an issue where the Kafka consumer cache seems to overflow 
> constantly upon starting the application. This issue appeared after upgrading 
> to Spark 2.4.
> We would get constant warnings like this:
> {code:java}
> 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43)
> {code}
> This application is running 4 different Spark Structured Streaming queries 
> against the same Kafka topic that has 90 partitions. We used to run it with 
> just the default settings so it defaulted to cache size 64 on Spark 2.3 but 
> now we tried to put it to 180 or 360. With 360 we will have a lot less noise 
> about the overflow but resource need will increase substantially.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26396) Kafka consumer cache overflow since 2.4.x

2018-12-19 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725063#comment-16725063
 ] 

Gabor Somogyi commented on SPARK-26396:
---

Number of executors can be set with {{--num-executors parameter, so it can be 
set differently for PROD and DEV.}}

{{There is no specific formula. One cached consumer will be created and stored 
inside a JVM in cache per (topic + partition + }}group.id).

 

> Kafka consumer cache overflow since 2.4.x
> -
>
> Key: SPARK-26396
> URL: https://issues.apache.org/jira/browse/SPARK-26396
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark 2.4 standalone client mode
>Reporter: Kaspar Tint
>Priority: Major
>
> We are experiencing an issue where the Kafka consumer cache seems to overflow 
> constantly upon starting the application. This issue appeared after upgrading 
> to Spark 2.4.
> We would get constant warnings like this:
> {code:java}
> 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43)
> {code}
> This application is running 4 different Spark Structured Streaming queries 
> against the same Kafka topic that has 90 partitions. We used to run it with 
> just the default settings so it defaulted to cache size 64 on Spark 2.3 but 
> now we tried to put it to 180 or 360. With 360 we will have a lot less noise 
> about the overflow but resource need will increase substantially.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26396) Kafka consumer cache overflow since 2.4.x

2018-12-19 Thread Kaspar Tint (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725050#comment-16725050
 ] 

Kaspar Tint edited comment on SPARK-26396 at 12/19/18 2:39 PM:
---

Any exact formula to use for this when considering that the application can 
have many different queries? We don't need that many executors in dev for 
instance but in production we indeed have plenty of them.


was (Author: tint):
Any exact formula to use for this when considering that the application can 
have many different queries? 

> Kafka consumer cache overflow since 2.4.x
> -
>
> Key: SPARK-26396
> URL: https://issues.apache.org/jira/browse/SPARK-26396
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark 2.4 standalone client mode
>Reporter: Kaspar Tint
>Priority: Major
>
> We are experiencing an issue where the Kafka consumer cache seems to overflow 
> constantly upon starting the application. This issue appeared after upgrading 
> to Spark 2.4.
> We would get constant warnings like this:
> {code:java}
> 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43)
> {code}
> This application is running 4 different Spark Structured Streaming queries 
> against the same Kafka topic that has 90 partitions. We used to run it with 
> just the default settings so it defaulted to cache size 64 on Spark 2.3 but 
> now we tried to put it to 180 or 360. With 360 we will have a lot less noise 
> about the overflow but resource need will increase substantially.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26396) Kafka consumer cache overflow since 2.4.x

2018-12-19 Thread Kaspar Tint (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725050#comment-16725050
 ] 

Kaspar Tint commented on SPARK-26396:
-

Any exact formula to use for this when considering that the application can 
have many different queries? 

> Kafka consumer cache overflow since 2.4.x
> -
>
> Key: SPARK-26396
> URL: https://issues.apache.org/jira/browse/SPARK-26396
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark 2.4 standalone client mode
>Reporter: Kaspar Tint
>Priority: Major
>
> We are experiencing an issue where the Kafka consumer cache seems to overflow 
> constantly upon starting the application. This issue appeared after upgrading 
> to Spark 2.4.
> We would get constant warnings like this:
> {code:java}
> 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43)
> {code}
> This application is running 4 different Spark Structured Streaming queries 
> against the same Kafka topic that has 90 partitions. We used to run it with 
> just the default settings so it defaulted to cache size 64 on Spark 2.3 but 
> now we tried to put it to 180 or 360. With 360 we will have a lot less noise 
> about the overflow but resource need will increase substantially.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26396) Kafka consumer cache overflow since 2.4.x

2018-12-19 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725021#comment-16725021
 ] 

Gabor Somogyi edited comment on SPARK-26396 at 12/19/18 2:07 PM:
-

[~Tint] seems like you're trying to scale your application vertically which 
requires really strong machine.

Try to scale horizontally and add further executors. This way the load will be 
split up and reduce the amount of cached consumers per JVM.


was (Author: gsomogyi):
[~Tint] seems like you're trying to scale your application vertically which 
requires really strong machine.

Try to scale horizontally and add further executors. This way the load will be 
split up. This will reduce the amount of cached consumers per JVM.

> Kafka consumer cache overflow since 2.4.x
> -
>
> Key: SPARK-26396
> URL: https://issues.apache.org/jira/browse/SPARK-26396
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark 2.4 standalone client mode
>Reporter: Kaspar Tint
>Priority: Major
>
> We are experiencing an issue where the Kafka consumer cache seems to overflow 
> constantly upon starting the application. This issue appeared after upgrading 
> to Spark 2.4.
> We would get constant warnings like this:
> {code:java}
> 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43)
> {code}
> This application is running 4 different Spark Structured Streaming queries 
> against the same Kafka topic that has 90 partitions. We used to run it with 
> just the default settings so it defaulted to cache size 64 on Spark 2.3 but 
> now we tried to put it to 180 or 360. With 360 we will have a lot less noise 
> about the overflow but resource need will increase substantially.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26396) Kafka consumer cache overflow since 2.4.x

2018-12-19 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725021#comment-16725021
 ] 

Gabor Somogyi commented on SPARK-26396:
---

[~Tint] seems like you're trying to scale your application vertically which 
requires really strong machine.

Try to scale horizontally and add further executors. This way the load will be 
split up. This will reduce the amount of cached consumers per JVM.

> Kafka consumer cache overflow since 2.4.x
> -
>
> Key: SPARK-26396
> URL: https://issues.apache.org/jira/browse/SPARK-26396
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark 2.4 standalone client mode
>Reporter: Kaspar Tint
>Priority: Major
>
> We are experiencing an issue where the Kafka consumer cache seems to overflow 
> constantly upon starting the application. This issue appeared after upgrading 
> to Spark 2.4.
> We would get constant warnings like this:
> {code:java}
> 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57)
> 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
> capacity of 180, removing consumer for 
> CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43)
> {code}
> This application is running 4 different Spark Structured Streaming queries 
> against the same Kafka topic that has 90 partitions. We used to run it with 
> just the default settings so it defaulted to cache size 64 on Spark 2.3 but 
> now we tried to put it to 180 or 360. With 360 we will have a lot less noise 
> about the overflow but resource need will increase substantially.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26407) For an external non-partitioned table, if add a directory named with k=v to the table path, select result will be wrong

2018-12-19 Thread Bao Yunz (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bao Yunz updated SPARK-26407:
-
Description: 
Scenario 1

Create an external non-partitioned table, in which location directory has a 
directory named with "part=1" and its schema is (id, name), for example. And 
there is some data in the "part=1" directory. Then desc the table, we will find 
the "part" is added in table scehma as table column. when insert into the table 
with two columns data, will throw a exception that  target table has 3 columns 
but the inserted data has 2 columns. 

Scenario 2

Create an external non-partitioned table, which location path is empty and its 
scema is (id, name), for example. After several times insert operation, we add 
a directory named with "part=1" in the table location directory.  And there is 
some data in the "part=1" directory.  Then do insert and select operation, we 
will find the scan path is changed to "tablePath/part=1",so that we will get a 
wrong result.

 The right logic should be that if a table is a non-partitioned table, adding a 
partition-like folder under tablePath should not change its schema and select 
result.

  was:
Scene 1

Create an external non-partition table, in which location directory has a 
directory named with "part=1", for example. Then desc the table, we will find 
the string "part" is showed in table column. when insert the table with data 
which has same column with target table , will throw a exception that target 
table has different column number with the inserted data. 

Scene 2

Create a external non-partition table, which location path is empty. After 
several times insert operation, we add a directory named with "part=1" in the 
table location directory. Then do insert and select operation, we will find the 
scan path is changed to "tablePath/part=1",so that we will get a wrong result.

 

It seems that the existing logic of spark will process this kind of table like 
a partition table. But when we do show partitions operation, it will throw the 
exception that the table is not partitioned, which is confusing。We believe that 
the normal logic should be that if a table is a non-partitioned table, the 
folder under tablePath should not change its basic properties.


> For an external non-partitioned table, if add a directory named with k=v to 
> the table path, select result will be wrong
> ---
>
> Key: SPARK-26407
> URL: https://issues.apache.org/jira/browse/SPARK-26407
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Bao Yunz
>Priority: Major
>  Labels: usability
>
> Scenario 1
> Create an external non-partitioned table, in which location directory has a 
> directory named with "part=1" and its schema is (id, name), for example. And 
> there is some data in the "part=1" directory. Then desc the table, we will 
> find the "part" is added in table scehma as table column. when insert into 
> the table with two columns data, will throw a exception that  target table 
> has 3 columns but the inserted data has 2 columns. 
> Scenario 2
> Create an external non-partitioned table, which location path is empty and 
> its scema is (id, name), for example. After several times insert operation, 
> we add a directory named with "part=1" in the table location directory.  And 
> there is some data in the "part=1" directory.  Then do insert and select 
> operation, we will find the scan path is changed to "tablePath/part=1",so 
> that we will get a wrong result.
>  The right logic should be that if a table is a non-partitioned table, adding 
> a partition-like folder under tablePath should not change its schema and 
> select result.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26407) For an external non-partitioned table, if add a directory named with k=v to the table path, select result will be wrong

2018-12-19 Thread Bao Yunz (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bao Yunz updated SPARK-26407:
-
Affects Version/s: (was: 2.3.2)

> For an external non-partitioned table, if add a directory named with k=v to 
> the table path, select result will be wrong
> ---
>
> Key: SPARK-26407
> URL: https://issues.apache.org/jira/browse/SPARK-26407
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Bao Yunz
>Priority: Major
>  Labels: usability
>
> Scene 1
> Create a external non-partition table, in which location directory has a 
> directory named with "part=1", for example. Then desc the table, we will find 
> the string "part" is showed in table column. when insert the table with data 
> which has same column with target table , will throw a exception that target 
> table has different column number with the inserted data. 
> Scene 2
> Create a external non-partition table, which location path is empty. After 
> several times insert operation, we add a directory named with "part=1" in the 
> table location directory. Then do insert and select operation, we will find 
> the scan path is changed to "tablePath/part=1",so that we will get a wrong 
> result.
>  
> It seems that the existing logic of spark will process this kind of table 
> like a partition table. But when we do show partitions operation, it will 
> throw the exception that the table is not partitioned, which is confusing。We 
> believe that the normal logic should be that if a table is a non-partitioned 
> table, the folder under tablePath should not change its basic properties.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26407) For an external non-partitioned table, if add a directory named with k=v to the table path, select result will be wrong

2018-12-19 Thread Bao Yunz (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bao Yunz updated SPARK-26407:
-
Description: 
Scene 1

Create an external non-partition table, in which location directory has a 
directory named with "part=1", for example. Then desc the table, we will find 
the string "part" is showed in table column. when insert the table with data 
which has same column with target table , will throw a exception that target 
table has different column number with the inserted data. 

Scene 2

Create a external non-partition table, which location path is empty. After 
several times insert operation, we add a directory named with "part=1" in the 
table location directory. Then do insert and select operation, we will find the 
scan path is changed to "tablePath/part=1",so that we will get a wrong result.

 

It seems that the existing logic of spark will process this kind of table like 
a partition table. But when we do show partitions operation, it will throw the 
exception that the table is not partitioned, which is confusing。We believe that 
the normal logic should be that if a table is a non-partitioned table, the 
folder under tablePath should not change its basic properties.

  was:
Scene 1

Create a external non-partition table, in which location directory has a 
directory named with "part=1", for example. Then desc the table, we will find 
the string "part" is showed in table column. when insert the table with data 
which has same column with target table , will throw a exception that target 
table has different column number with the inserted data. 

Scene 2

Create a external non-partition table, which location path is empty. After 
several times insert operation, we add a directory named with "part=1" in the 
table location directory. Then do insert and select operation, we will find the 
scan path is changed to "tablePath/part=1",so that we will get a wrong result.

 

It seems that the existing logic of spark will process this kind of table like 
a partition table. But when we do show partitions operation, it will throw the 
exception that the table is not partitioned, which is confusing。We believe that 
the normal logic should be that if a table is a non-partitioned table, the 
folder under tablePath should not change its basic properties.


> For an external non-partitioned table, if add a directory named with k=v to 
> the table path, select result will be wrong
> ---
>
> Key: SPARK-26407
> URL: https://issues.apache.org/jira/browse/SPARK-26407
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Bao Yunz
>Priority: Major
>  Labels: usability
>
> Scene 1
> Create an external non-partition table, in which location directory has a 
> directory named with "part=1", for example. Then desc the table, we will find 
> the string "part" is showed in table column. when insert the table with data 
> which has same column with target table , will throw a exception that target 
> table has different column number with the inserted data. 
> Scene 2
> Create a external non-partition table, which location path is empty. After 
> several times insert operation, we add a directory named with "part=1" in the 
> table location directory. Then do insert and select operation, we will find 
> the scan path is changed to "tablePath/part=1",so that we will get a wrong 
> result.
>  
> It seems that the existing logic of spark will process this kind of table 
> like a partition table. But when we do show partitions operation, it will 
> throw the exception that the table is not partitioned, which is confusing。We 
> believe that the normal logic should be that if a table is a non-partitioned 
> table, the folder under tablePath should not change its basic properties.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26407) For an external non-partitioned table, if add a directory named with k=v to the table path, select result will be wrong

2018-12-19 Thread Bao Yunz (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bao Yunz updated SPARK-26407:
-
Summary: For an external non-partitioned table, if add a directory named 
with k=v to the table path, select result will be wrong  (was: For an external 
non-partition table, if add a directory named with k=v to the table path, 
select result will be wrong)

> For an external non-partitioned table, if add a directory named with k=v to 
> the table path, select result will be wrong
> ---
>
> Key: SPARK-26407
> URL: https://issues.apache.org/jira/browse/SPARK-26407
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Bao Yunz
>Priority: Major
>  Labels: usability
>
> Scene 1
> Create a external non-partition table, in which location directory has a 
> directory named with "part=1", for example. Then desc the table, we will find 
> the string "part" is showed in table column. when insert the table with data 
> which has same column with target table , will throw a exception that target 
> table has different column number with the inserted data. 
> Scene 2
> Create a external non-partition table, which location path is empty. After 
> several times insert operation, we add a directory named with "part=1" in the 
> table location directory. Then do insert and select operation, we will find 
> the scan path is changed to "tablePath/part=1",so that we will get a wrong 
> result.
>  
> It seems that the existing logic of spark will process this kind of table 
> like a partition table. But when we do show partitions operation, it will 
> throw the exception that the table is not partitioned, which is confusing。We 
> believe that the normal logic should be that if a table is a non-partitioned 
> table, the folder under tablePath should not change its basic properties.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26407) For an external non-partition table, if add a directory named with k=v to the table path, select result will be wrong

2018-12-19 Thread Bao Yunz (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bao Yunz updated SPARK-26407:
-
Summary: For an external non-partition table, if add a directory named with 
k=v to the table path, select result will be wrong  (was: Select result is 
incorrect when add a directory named with k=v to the table path of external 
non-partition table)

> For an external non-partition table, if add a directory named with k=v to the 
> table path, select result will be wrong
> -
>
> Key: SPARK-26407
> URL: https://issues.apache.org/jira/browse/SPARK-26407
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Bao Yunz
>Priority: Major
>  Labels: usability
>
> Scene 1
> Create a external non-partition table, in which location directory has a 
> directory named with "part=1", for example. Then desc the table, we will find 
> the string "part" is showed in table column. when insert the table with data 
> which has same column with target table , will throw a exception that target 
> table has different column number with the inserted data. 
> Scene 2
> Create a external non-partition table, which location path is empty. After 
> several times insert operation, we add a directory named with "part=1" in the 
> table location directory. Then do insert and select operation, we will find 
> the scan path is changed to "tablePath/part=1",so that we will get a wrong 
> result.
>  
> It seems that the existing logic of spark will process this kind of table 
> like a partition table. But when we do show partitions operation, it will 
> throw the exception that the table is not partitioned, which is confusing。We 
> believe that the normal logic should be that if a table is a non-partitioned 
> table, the folder under tablePath should not change its basic properties.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26407) Select result is incorrect when add a directory named with k=v to the table path of external non-partition table

2018-12-19 Thread Bao Yunz (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bao Yunz updated SPARK-26407:
-
Description: 
Scene 1

Create a external non-partition table, in which location directory has a 
directory named with "part=1", for example. Then desc the table, we will find 
the string "part" is showed in table column. when insert the table with data 
which has same column with target table , will throw a exception that target 
table has different column number with the inserted data. 

Scene 2

Create a external non-partition table, which location path is empty. After 
several times insert operation, we add a directory named with "part=1" in the 
table location directory. Then do insert and select operation, we will find the 
scan path is changed to "tablePath/part=1",so that we will get a wrong result.

 

It seems that the existing logic of spark will process this kind of table like 
a partition table. But when we do show partitions operation, it will throw the 
exception that the table is not partitioned, which is confusing。We believe that 
the normal logic should be that if a table is a non-partitioned table, the 
folder under tablePath should not change its basic properties.

  was:
Scene 1

Create a external non-partition table, in which location path has a directory 
named with "part=1", for example. Then desc the table, we will find the string 
"part" is showed in table column. when insert the table with data which has 
same column with target table , will throw a exception that target table has 
different column number with the inserted data. 

Scene 2

Create a external non-partition table, which location path is empty. After 
several times insert operation, we add a directory named with "part=1" in the 
table location directory. Then do insert and select operation, we will find the 
scan path is changed to "tablePath/part=1",so that we will get a wrong result.

 

It seems that the existing logic of spark will process this kind of table like 
a partition table. But when we do show partitions operation, it will throw the 
exception that the table is not partitioned, which is confusing。We believe that 
the normal logic should be that if a table is a non-partitioned table, the 
folder under tablePath should not change its basic properties.


> Select result is incorrect when add a directory named with k=v to the table 
> path of external non-partition table
> 
>
> Key: SPARK-26407
> URL: https://issues.apache.org/jira/browse/SPARK-26407
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Bao Yunz
>Priority: Major
>  Labels: usability
>
> Scene 1
> Create a external non-partition table, in which location directory has a 
> directory named with "part=1", for example. Then desc the table, we will find 
> the string "part" is showed in table column. when insert the table with data 
> which has same column with target table , will throw a exception that target 
> table has different column number with the inserted data. 
> Scene 2
> Create a external non-partition table, which location path is empty. After 
> several times insert operation, we add a directory named with "part=1" in the 
> table location directory. Then do insert and select operation, we will find 
> the scan path is changed to "tablePath/part=1",so that we will get a wrong 
> result.
>  
> It seems that the existing logic of spark will process this kind of table 
> like a partition table. But when we do show partitions operation, it will 
> throw the exception that the table is not partitioned, which is confusing。We 
> believe that the normal logic should be that if a table is a non-partitioned 
> table, the folder under tablePath should not change its basic properties.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26407) Select result is incorrect when add a directory named with k=v to the table path of external non-partition table

2018-12-19 Thread Bao Yunz (JIRA)
Bao Yunz created SPARK-26407:


 Summary: Select result is incorrect when add a directory named 
with k=v to the table path of external non-partition table
 Key: SPARK-26407
 URL: https://issues.apache.org/jira/browse/SPARK-26407
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0, 2.3.2
Reporter: Bao Yunz


Scene 1

Create a external non-partition table, in which location path has a directory 
named with "part=1", for example. Then desc the table, we will find the string 
"part" is showed in table column. when insert the table with data which has 
same column with target table , will throw a exception that target table has 
different column number with the inserted data. 

Scene 2

Create a external non-partition table, which location path is empty. After 
several times insert operation, we add a directory named with "part=1" in the 
table location directory. Then do insert and select operation, we will find the 
scan path is changed to "tablePath/part=1",so that we will get a wrong result.

 

It seems that the existing logic of spark will process this kind of table like 
a partition table. But when we do show partitions operation, it will throw the 
exception that the table is not partitioned, which is confusing。We believe that 
the normal logic should be that if a table is a non-partitioned table, the 
folder under tablePath should not change its basic properties.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26406) Add option to skip rows when reading csv files

2018-12-19 Thread Thomas Kastl (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Kastl updated SPARK-26406:
-
Description: 
Real-world data can contain multiple header lines. Spark currently does not 
offer any way to skip more than one header row.

Several workarounds are proposed on stackoverflow (manually editing each csv 
file by adding "#" to the rows and using the comment option, or filtering after 
reading) but all of them are workarounds with more or less obvious drawbacks 
and restrictions.

The option
{code:java}
header=True{code}
already treats the first row of csv files differently, so the argument that 
Spark wants to be row-order agnostic does not really hold here in my opinion. A 
solution like pandas'
{code:java}
skiprows={code}
would be highly preferable.

  was:
Real-world data can contain multiple header lines. Spark currently does not 
offer any way to skip more than one header row.

Several workarounds are proposed on stackoverflow (manually editing each csv 
file by adding "#" to the rows and using the comment option, or filtering after 
reading) but all of them are workarounds with more or less obvious drawbacks 
and restrictions.

The option
{code:java}
header=True{code}
already treats the first row of csv files differently, so the argument that 
Spark wants to be row-order agnostic does not really hold here in my opinion. A 
solution like pandas
{code:java}
skiprows={code}
would be highly preferable.


> Add option to skip rows when reading csv files
> --
>
> Key: SPARK-26406
> URL: https://issues.apache.org/jira/browse/SPARK-26406
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Thomas Kastl
>Priority: Minor
>
> Real-world data can contain multiple header lines. Spark currently does not 
> offer any way to skip more than one header row.
> Several workarounds are proposed on stackoverflow (manually editing each csv 
> file by adding "#" to the rows and using the comment option, or filtering 
> after reading) but all of them are workarounds with more or less obvious 
> drawbacks and restrictions.
> The option
> {code:java}
> header=True{code}
> already treats the first row of csv files differently, so the argument that 
> Spark wants to be row-order agnostic does not really hold here in my opinion. 
> A solution like pandas'
> {code:java}
> skiprows={code}
> would be highly preferable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26406) Add option to skip rows when reading csv files

2018-12-19 Thread Thomas Kastl (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Kastl updated SPARK-26406:
-
Description: 
Real-world data can contain multiple header lines. Spark currently does not 
offer any way to skip more than one header row.

Several workarounds are proposed on stackoverflow (manually editing each csv 
file by adding "#" to the rows and using the comment option, or filtering after 
reading) but all of them are workarounds with more or less obvious drawbacks 
and restrictions.

The option
{code:java}
header=True{code}
already treats the first row of csv files differently, so the argument that 
Spark wants to be row-order agnostic does not really hold here in my opinion. A 
solution like pandas
{code:java}
skiprows={code}
would be highly preferable.

  was:
Real-world data can contain multiple header lines. Spark currently does not 
offer any way to skip more than one header row.

Several workarounds are proposed on stackoverflow (manually editing each csv 
file by adding "#" to the rows and using the comment option, or filtering after 
reading) but all of them are workarounds with more or less obvious drawbacks 
and restrictions.

The option
{code:java}
header=True{code}
already treats the first row of csv files differently, so the argument that 
Spark wants to be row-agnostic does not really hold here in my opinion. A 
solution like pandas
{code:java}
skiprows={code}
would be highly preferable.


> Add option to skip rows when reading csv files
> --
>
> Key: SPARK-26406
> URL: https://issues.apache.org/jira/browse/SPARK-26406
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Thomas Kastl
>Priority: Minor
>
> Real-world data can contain multiple header lines. Spark currently does not 
> offer any way to skip more than one header row.
> Several workarounds are proposed on stackoverflow (manually editing each csv 
> file by adding "#" to the rows and using the comment option, or filtering 
> after reading) but all of them are workarounds with more or less obvious 
> drawbacks and restrictions.
> The option
> {code:java}
> header=True{code}
> already treats the first row of csv files differently, so the argument that 
> Spark wants to be row-order agnostic does not really hold here in my opinion. 
> A solution like pandas
> {code:java}
> skiprows={code}
> would be highly preferable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26406) Add option to skip rows when reading csv files

2018-12-19 Thread Thomas Kastl (JIRA)
Thomas Kastl created SPARK-26406:


 Summary: Add option to skip rows when reading csv files
 Key: SPARK-26406
 URL: https://issues.apache.org/jira/browse/SPARK-26406
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Thomas Kastl


Real-world data can contain multiple header lines. Spark currently does not 
offer any way to skip more than one header row.

Several workarounds are proposed on stackoverflow (manually editing each csv 
file by adding "#" to the rows and using the comment option, or filtering after 
reading) but all of them are workarounds with more or less obvious drawbacks 
and restrictions.

The option
{code:java}
header=True{code}
already treats the first row of csv files differently, so the argument that 
Spark wants to be row-agnostic does not really hold here in my opinion. A 
solution like pandas
{code:java}
skiprows={code}
would be highly preferable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26366) Except with transform regression

2018-12-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724868#comment-16724868
 ] 

ASF GitHub Bot commented on SPARK-26366:


mgaido91 opened a new pull request #23350: [SPARK-26366][SQL][BACKPORT-2.3] 
ReplaceExceptWithFilter should consider NULL as False
URL: https://github.com/apache/spark/pull/23350
 
 
   ## What changes were proposed in this pull request?
   
   In `ReplaceExceptWithFilter` we do not consider properly the case in which 
the condition returns NULL. Indeed, in that case, since negating NULL still 
returns NULL, so it is not true the assumption that negating the condition 
returns all the rows which didn't satisfy it, rows returning NULL may not be 
returned. This happens when constraints inferred by 
`InferFiltersFromConstraints` are not enough, as it happens with `OR` 
conditions.
   
   The rule had also problems with non-deterministic conditions: in such a 
scenario, this rule would change the probability of the output.
   
   The PR fixes these problem by:
- returning False for the condition when it is Null (in this way we do 
return all the rows which didn't satisfy it);
- avoiding any transformation when the condition is non-deterministic.
   
   ## How was this patch tested?
   
   added UTs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Except with transform regression
> 
>
> Key: SPARK-26366
> URL: https://issues.apache.org/jira/browse/SPARK-26366
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.3.2
>Reporter: Dan Osipov
>Assignee: Marco Gaido
>Priority: Major
> Fix For: 2.4.1, 3.0.0
>
>
> There appears to be a regression between Spark 2.2 and 2.3. Below is the code 
> to reproduce it:
>  
> {code:java}
> import org.apache.spark.sql.functions.col
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val inputDF = spark.sqlContext.createDataFrame(
>   spark.sparkContext.parallelize(Seq(
> Row("0", "john", "smith", "j...@smith.com"),
> Row("1", "jane", "doe", "j...@doe.com"),
> Row("2", "apache", "spark", "sp...@apache.org"),
> Row("3", "foo", "bar", null)
>   )),
>   StructType(List(
> StructField("id", StringType, nullable=true),
> StructField("first_name", StringType, nullable=true),
> StructField("last_name", StringType, nullable=true),
> StructField("email", StringType, nullable=true)
>   ))
> )
> val exceptDF = inputDF.transform( toProcessDF =>
>   toProcessDF.filter(
>   (
> col("first_name").isin(Seq("john", "jane"): _*)
>   and col("last_name").isin(Seq("smith", "doe"): _*)
>   )
>   or col("email").isin(List(): _*)
>   )
> )
> inputDF.except(exceptDF).show()
> {code}
> Output with Spark 2.2:
> {noformat}
> +---+--+-++
> | id|first_name|last_name| email|
> +---+--+-++
> | 2| apache| spark|sp...@apache.org|
> | 3| foo| bar| null|
> +---+--+-++{noformat}
> Output with Spark 2.3:
> {noformat}
> +---+--+-++
> | id|first_name|last_name| email|
> +---+--+-++
> | 2| apache| spark|sp...@apache.org|
> +---+--+-++{noformat}
> Note, changing the last line to 
> {code:java}
> inputDF.except(exceptDF.cache()).show()
> {code}
> produces identical output for both Spark 2.3 and 2.2
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26405) OOM

2018-12-19 Thread lu (JIRA)
lu created SPARK-26405:
--

 Summary: OOM
 Key: SPARK-26405
 URL: https://issues.apache.org/jira/browse/SPARK-26405
 Project: Spark
  Issue Type: Bug
  Components: Java API, Scheduler, Shuffle, Spark Core, Spark Submit
Affects Versions: 2.2.0
Reporter: lu


Heap memory overflow occurred in the user portrait analysis, and the data 
volume analyzed was about 10 million records

spark work memory:4G

using RestSubmissionClient to submit the job

boht the driver memory and executor memory :4g

total executor cores: 6

spark cores:2

the cluster size:3

 

INFO worker.WorkerWatcher: Connecting to worker 
spark://Worker@192.168.44.181:45315
Exception in thread "broadcast-exchange-3" java.lang.OutOfMemoryError: Not 
enough memory to build and broadcast the table to all worker nodes. As a 
workaround, you can either disable broadcast by setting 
spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory 
by setting spark.driver.memory to a higher value
 at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:102)
 at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73)
 at 
org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:103)
 at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
 at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
 at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
 at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Exception in thread "main" java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
 at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 
seconds]
 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
 at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123)
 at 
org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:248)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
 at 
org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:126)
 at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
 at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197)
 at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82)
 at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
 at 
org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:36)
 at 
org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:68)
 at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
 at 
org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:88)
 at 
org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:209)
 at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
 at 
org.apache.spark.sql.execution.SerializeFromObjectExec.consume(objects.scala:107)
 at 
org.apache.spark.sql.execution.SerializeFromObjectExec.doConsume(objects.scala:129)
 at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
 at 

[jira] [Commented] (SPARK-26403) DataFrame pivot using array column fails with "Unsupported literal type class"

2018-12-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724839#comment-16724839
 ] 

ASF GitHub Bot commented on SPARK-26403:


HyukjinKwon opened a new pull request #23349: [SPARK-26403][SQL] Support 
pivoting using array column for `pivot(column)` API
URL: https://github.com/apache/spark/pull/23349
 
 
   ## What changes were proposed in this pull request?
   
   This PR fixes `Literal(..: Any)` can accepts 
`collection.mutable.WrappedArray` in order to `pivot(Column)` can accepts array 
column as well.
   
   We can unwrap the array and use it for type dispatch.
   
   ```scala
   val df = Seq(
 (2, Seq.empty[String]),
 (2, Seq("a", "x")),
 (3, Seq.empty[String]),
 (3, Seq("a", "x"))).toDF("x", "s")
   df.groupBy("x").pivot("s").count().show()
   ```
   
   Before:
   
   ```
   Unsupported literal type class scala.collection.mutable.WrappedArray$ofRef 
WrappedArray()
   java.lang.RuntimeException: Unsupported literal type class 
scala.collection.mutable.WrappedArray$ofRef WrappedArray()
at 
org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:80)
at 
org.apache.spark.sql.RelationalGroupedDataset.$anonfun$pivot$2(RelationalGroupedDataset.scala:427)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:39)
at scala.collection.TraversableLike.map(TraversableLike.scala:237)
at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:425)
at 
org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:406)
at 
org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:317)
at 
org.apache.spark.sql.DataFramePivotSuite.$anonfun$new$1(DataFramePivotSuite.scala:341)
at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   ```
   
   After:
   
   ```
   +---+---+--+
   |  x| []|[a, x]|
   +---+---+--+
   |  3|  1| 1|
   |  2|  1| 1|
   +---+---+--+
   ```
   
   ## How was this patch tested?
   
   Manually tested and unittests were added.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> DataFrame pivot using array column fails with "Unsupported literal type class"
> --
>
> Key: SPARK-26403
> URL: https://issues.apache.org/jira/browse/SPARK-26403
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Huon Wilson
>Priority: Minor
>
> Doing a pivot (using the {{pivot(pivotColumn: Column)}} overload) on a column 
> containing arrays results in a runtime error:
> {code:none}
> scala> val df = Seq((1, Seq("a", "x"), 2), (1, Seq("b"), 3), (2, Seq("a", 
> "x"), 10), (3, Seq(), 100)).toDF("x", "s", "y")
> df: org.apache.spark.sql.DataFrame = [x: int, s: array ... 1 more 
> field]
> scala> df.show
> +---+--+---+
> |  x| s|  y|
> +---+--+---+
> |  1|[a, x]|  2|
> |  1|   [b]|  3|
> |  2|[a, x]| 10|
> |  3|[]|100|
> +---+--+---+
> scala> df.groupBy("x").pivot("s").agg(collect_list($"y")).show
> java.lang.RuntimeException: Unsupported literal type class 
> scala.collection.mutable.WrappedArray$ofRef WrappedArray()
>   at 
> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78)
>   at 
> org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419)
>   at 
> org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> 

[jira] [Assigned] (SPARK-26403) DataFrame pivot using array column fails with "Unsupported literal type class"

2018-12-19 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26403:


Assignee: Apache Spark

> DataFrame pivot using array column fails with "Unsupported literal type class"
> --
>
> Key: SPARK-26403
> URL: https://issues.apache.org/jira/browse/SPARK-26403
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Huon Wilson
>Assignee: Apache Spark
>Priority: Minor
>
> Doing a pivot (using the {{pivot(pivotColumn: Column)}} overload) on a column 
> containing arrays results in a runtime error:
> {code:none}
> scala> val df = Seq((1, Seq("a", "x"), 2), (1, Seq("b"), 3), (2, Seq("a", 
> "x"), 10), (3, Seq(), 100)).toDF("x", "s", "y")
> df: org.apache.spark.sql.DataFrame = [x: int, s: array ... 1 more 
> field]
> scala> df.show
> +---+--+---+
> |  x| s|  y|
> +---+--+---+
> |  1|[a, x]|  2|
> |  1|   [b]|  3|
> |  2|[a, x]| 10|
> |  3|[]|100|
> +---+--+---+
> scala> df.groupBy("x").pivot("s").agg(collect_list($"y")).show
> java.lang.RuntimeException: Unsupported literal type class 
> scala.collection.mutable.WrappedArray$ofRef WrappedArray()
>   at 
> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78)
>   at 
> org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419)
>   at 
> org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:419)
>   at 
> org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:397)
>   at 
> org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:317)
>   ... 49 elided
> {code}
> However, this doesn't seem to be a fundamental limitation with {{pivot}}, as 
> it works fine using the {{pivot(pivotColumn: Column, values: Seq[Any])}} 
> overload, as long as the arrays are mapped to the {{Array}} type:
> {code:none}
> scala> val rawValues = df.select("s").distinct.sort("s").collect
> rawValues: Array[org.apache.spark.sql.Row] = Array([WrappedArray()], 
> [WrappedArray(a, x)], [WrappedArray(b)])
> scala> val values = rawValues.map(_.getSeq[String](0).to[Array])
> values: Array[Array[String]] = Array(Array(), Array(a, x), Array(b))
> scala> df.groupBy("x").pivot("s", values).agg(collect_list($"y")).show
> +---+-+--+---+
> |  x|   []|[a, x]|[b]|
> +---+-+--+---+
> |  1|   []|   [2]|[3]|
> |  3|[100]|[]| []|
> |  2|   []|  [10]| []|
> +---+-+--+---+
> {code}
> It would be nice if {{pivot}} was more resilient to Spark's own 
> representation of array columns, and so the first version worked.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26403) DataFrame pivot using array column fails with "Unsupported literal type class"

2018-12-19 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26403:


Assignee: (was: Apache Spark)

> DataFrame pivot using array column fails with "Unsupported literal type class"
> --
>
> Key: SPARK-26403
> URL: https://issues.apache.org/jira/browse/SPARK-26403
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Huon Wilson
>Priority: Minor
>
> Doing a pivot (using the {{pivot(pivotColumn: Column)}} overload) on a column 
> containing arrays results in a runtime error:
> {code:none}
> scala> val df = Seq((1, Seq("a", "x"), 2), (1, Seq("b"), 3), (2, Seq("a", 
> "x"), 10), (3, Seq(), 100)).toDF("x", "s", "y")
> df: org.apache.spark.sql.DataFrame = [x: int, s: array ... 1 more 
> field]
> scala> df.show
> +---+--+---+
> |  x| s|  y|
> +---+--+---+
> |  1|[a, x]|  2|
> |  1|   [b]|  3|
> |  2|[a, x]| 10|
> |  3|[]|100|
> +---+--+---+
> scala> df.groupBy("x").pivot("s").agg(collect_list($"y")).show
> java.lang.RuntimeException: Unsupported literal type class 
> scala.collection.mutable.WrappedArray$ofRef WrappedArray()
>   at 
> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78)
>   at 
> org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419)
>   at 
> org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:419)
>   at 
> org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:397)
>   at 
> org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:317)
>   ... 49 elided
> {code}
> However, this doesn't seem to be a fundamental limitation with {{pivot}}, as 
> it works fine using the {{pivot(pivotColumn: Column, values: Seq[Any])}} 
> overload, as long as the arrays are mapped to the {{Array}} type:
> {code:none}
> scala> val rawValues = df.select("s").distinct.sort("s").collect
> rawValues: Array[org.apache.spark.sql.Row] = Array([WrappedArray()], 
> [WrappedArray(a, x)], [WrappedArray(b)])
> scala> val values = rawValues.map(_.getSeq[String](0).to[Array])
> values: Array[Array[String]] = Array(Array(), Array(a, x), Array(b))
> scala> df.groupBy("x").pivot("s", values).agg(collect_list($"y")).show
> +---+-+--+---+
> |  x|   []|[a, x]|[b]|
> +---+-+--+---+
> |  1|   []|   [2]|[3]|
> |  3|[100]|[]| []|
> |  2|   []|  [10]| []|
> +---+-+--+---+
> {code}
> It would be nice if {{pivot}} was more resilient to Spark's own 
> representation of array columns, and so the first version worked.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26375) Rule PruneFileSourcePartitions should be fired before any other rules based on table statistics

2018-12-19 Thread Wang, Gang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724827#comment-16724827
 ] 

Wang, Gang commented on SPARK-26375:


Should be okay, filter on partition columns is also regarded as a normal 
filter, and the output stats is measured in class FilterEstimation.

 

> Rule PruneFileSourcePartitions should be fired before any other rules based 
> on table statistics
> ---
>
> Key: SPARK-26375
> URL: https://issues.apache.org/jira/browse/SPARK-26375
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wang, Gang
>Priority: Major
>
> In catalyst, some optimize rules are base on table statistics, like rule 
> ReorderJoin, in which star schema is detected, and CostBasedJoinReorder. In 
> these rules, statistics accuracy are crucial. While, currently all these 
> rules are fired before partition pruning, which may result in inaccurate 
> statistics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26375) Rule PruneFileSourcePartitions should be fired before any other rules based on table statistics

2018-12-19 Thread Wang, Gang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang, Gang resolved SPARK-26375.

Resolution: Not A Problem

> Rule PruneFileSourcePartitions should be fired before any other rules based 
> on table statistics
> ---
>
> Key: SPARK-26375
> URL: https://issues.apache.org/jira/browse/SPARK-26375
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wang, Gang
>Priority: Major
>
> In catalyst, some optimize rules are base on table statistics, like rule 
> ReorderJoin, in which star schema is detected, and CostBasedJoinReorder. In 
> these rules, statistics accuracy are crucial. While, currently all these 
> rules are fired before partition pruning, which may result in inaccurate 
> statistics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org