[jira] [Commented] (SPARK-26308) Large BigDecimal value is converted to null when passed into a UDF
[ https://issues.apache.org/jira/browse/SPARK-26308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725614#comment-16725614 ] ASF GitHub Bot commented on SPARK-26308: asfgit closed pull request #23308: [SPARK-26308][SQL] Avoid cast of decimals for ScalaUDF URL: https://github.com/apache/spark/pull/23308 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 133fa119b7aa6..1706b3eece6d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -879,6 +879,37 @@ object TypeCoercion { } } e.withNewChildren(children) + + case udf: ScalaUDF if udf.inputTypes.nonEmpty => +val children = udf.children.zip(udf.inputTypes).map { case (in, expected) => + implicitCast(in, udfInputToCastType(in.dataType, expected)).getOrElse(in) +} +udf.withNewChildren(children) +} + +private def udfInputToCastType(input: DataType, expectedType: DataType): DataType = { + (input, expectedType) match { +// SPARK-26308: avoid casting to an arbitrary precision and scale for decimals. Please note +// that precision and scale cannot be inferred properly for a ScalaUDF because, when it is +// created, it is not bound to any column. So here the precision and scale of the input +// column is used. +case (in: DecimalType, _: DecimalType) => in +case (ArrayType(dtIn, _), ArrayType(dtExp, nullableExp)) => + ArrayType(udfInputToCastType(dtIn, dtExp), nullableExp) +case (MapType(keyDtIn, valueDtIn, _), MapType(keyDtExp, valueDtExp, nullableExp)) => + MapType(udfInputToCastType(keyDtIn, keyDtExp), +udfInputToCastType(valueDtIn, valueDtExp), +nullableExp) +case (StructType(fieldsIn), StructType(fieldsExp)) => + val fieldTypes = +fieldsIn.map(_.dataType).zip(fieldsExp.map(_.dataType)).map { case (dtIn, dtExp) => + udfInputToCastType(dtIn, dtExp) +} + StructType(fieldsExp.zip(fieldTypes).map { case (field, newDt) => +field.copy(dataType = newDt) + }) +case (_, other) => other + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala index fae90caebf96c..a23aaa3a0b3ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala @@ -52,7 +52,7 @@ case class ScalaUDF( udfName: Option[String] = None, nullable: Boolean = true, udfDeterministic: Boolean = true) - extends Expression with ImplicitCastInputTypes with NonSQLExpression with UserDefinedExpression { + extends Expression with NonSQLExpression with UserDefinedExpression { // The constructor for SPARK 2.1 and 2.2 def this( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 20dcefa7e3cad..a26d306cff6b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import java.math.BigDecimal + import org.apache.spark.sql.api.java._ import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.QueryExecution @@ -26,7 +28,7 @@ import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationComm import org.apache.spark.sql.functions.{lit, udf} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SQLTestData._ -import org.apache.spark.sql.types.{DataTypes, DoubleType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.QueryExecutionListener @@ -420,4 +422,32 @@ class UDFSuite extends QueryTest with SharedSQLContext { checkAnswer(df, Seq(Row("null1x"), Row(null), Row("N3null"))) } } + + test("SPARK-26308: udf with decimal") { +val df1 = spark.createDataFrame( + sparkContext.parallelize(Seq(Row(new BigDecimal("20110002456556", + StructType(Seq(StructField("col1", DecimalType(30, 0) +val udf1 = org.apache.spark.sql.functions.udf((value:
[jira] [Resolved] (SPARK-26308) Large BigDecimal value is converted to null when passed into a UDF
[ https://issues.apache.org/jira/browse/SPARK-26308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-26308. - Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23308 [https://github.com/apache/spark/pull/23308] > Large BigDecimal value is converted to null when passed into a UDF > -- > > Key: SPARK-26308 > URL: https://issues.apache.org/jira/browse/SPARK-26308 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Jay Pranavamurthi >Assignee: Marco Gaido >Priority: Major > Fix For: 3.0.0 > > > We are loading a Hive table into a Spark DataFrame. The Hive table has a > decimal(30, 0) column with values greater than Long.MAX_VALUE. The DataFrame > loads correctly. > We then use a UDF to convert the decimal type to a String value. For decimal > values < Long.MAX_VALUE, this works fine, but when the decimal value > > Long.MAX_VALUE, the input to the UDF is a *null*. > Hive table schema and data: > {code:java} > create table decimal_test (col1 decimal(30, 0), col2 decimal(10, 0), col3 > int, col4 string); > insert into decimal_test values(20110002456556, 123456789, 10, > 'test1'); > {code} > > Execution in spark-shell: > _(Note that the first column in the final output is null, it should have been > "20110002456556")_ > {code:java} > scala> val df1 = spark.sqlContext.sql("select * from decimal_test") > df1: org.apache.spark.sql.DataFrame = [col1: decimal(30,0), col2: > decimal(10,0) ... 2 more fields] > scala> df1.show > ++-++-+ > | col1| col2|col3| col4| > ++-++-+ > |201100024...|123456789| 10|test1| > ++-++-+ > scala> val decimalToString = (value: java.math.BigDecimal) => if (value == > null) null else { value.toBigInteger().toString } > decimalToString: java.math.BigDecimal => String = > scala> val udf1 = org.apache.spark.sql.functions.udf(decimalToString) > udf1: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,StringType,Some(List(DecimalType(38,18 > scala> val df2 = df1.withColumn("col1", udf1(df1.col("col1"))) > df2: org.apache.spark.sql.DataFrame = [col1: string, col2: decimal(10,0) ... > 2 more fields] > scala> df2.show > ++-++-+ > |col1| col2|col3| col4| > ++-++-+ > |null|123456789| 10|test1| > ++-++-+ > {code} > Oddly this works if we change the "decimalToString" udf to take an "Any" > instead of a "java.math.BigDecimal" > {code:java} > scala> val decimalToString = (value: Any) => if (value == null) null else { > if (value.isInstanceOf[java.math.BigDecimal]) > value.asInstanceOf[java.math.BigDecimal].toBigInteger().toString else null } > decimalToString: Any => String = > scala> val udf1 = org.apache.spark.sql.functions.udf(decimalToString) > udf1: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,StringType,None) > scala> val df2 = df1.withColumn("col1", udf1(df1.col("col1"))) > df2: org.apache.spark.sql.DataFrame = [col1: string, col2: decimal(10,0) ... > 2 more fields] > scala> df2.show > ++-++-+ > | col1| col2|col3| col4| > ++-++-+ > |201100024...|123456789| 10|test1| > ++-++-+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26308) Large BigDecimal value is converted to null when passed into a UDF
[ https://issues.apache.org/jira/browse/SPARK-26308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-26308: --- Assignee: Marco Gaido > Large BigDecimal value is converted to null when passed into a UDF > -- > > Key: SPARK-26308 > URL: https://issues.apache.org/jira/browse/SPARK-26308 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Jay Pranavamurthi >Assignee: Marco Gaido >Priority: Major > > We are loading a Hive table into a Spark DataFrame. The Hive table has a > decimal(30, 0) column with values greater than Long.MAX_VALUE. The DataFrame > loads correctly. > We then use a UDF to convert the decimal type to a String value. For decimal > values < Long.MAX_VALUE, this works fine, but when the decimal value > > Long.MAX_VALUE, the input to the UDF is a *null*. > Hive table schema and data: > {code:java} > create table decimal_test (col1 decimal(30, 0), col2 decimal(10, 0), col3 > int, col4 string); > insert into decimal_test values(20110002456556, 123456789, 10, > 'test1'); > {code} > > Execution in spark-shell: > _(Note that the first column in the final output is null, it should have been > "20110002456556")_ > {code:java} > scala> val df1 = spark.sqlContext.sql("select * from decimal_test") > df1: org.apache.spark.sql.DataFrame = [col1: decimal(30,0), col2: > decimal(10,0) ... 2 more fields] > scala> df1.show > ++-++-+ > | col1| col2|col3| col4| > ++-++-+ > |201100024...|123456789| 10|test1| > ++-++-+ > scala> val decimalToString = (value: java.math.BigDecimal) => if (value == > null) null else { value.toBigInteger().toString } > decimalToString: java.math.BigDecimal => String = > scala> val udf1 = org.apache.spark.sql.functions.udf(decimalToString) > udf1: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,StringType,Some(List(DecimalType(38,18 > scala> val df2 = df1.withColumn("col1", udf1(df1.col("col1"))) > df2: org.apache.spark.sql.DataFrame = [col1: string, col2: decimal(10,0) ... > 2 more fields] > scala> df2.show > ++-++-+ > |col1| col2|col3| col4| > ++-++-+ > |null|123456789| 10|test1| > ++-++-+ > {code} > Oddly this works if we change the "decimalToString" udf to take an "Any" > instead of a "java.math.BigDecimal" > {code:java} > scala> val decimalToString = (value: Any) => if (value == null) null else { > if (value.isInstanceOf[java.math.BigDecimal]) > value.asInstanceOf[java.math.BigDecimal].toBigInteger().toString else null } > decimalToString: Any => String = > scala> val udf1 = org.apache.spark.sql.functions.udf(decimalToString) > udf1: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,StringType,None) > scala> val df2 = df1.withColumn("col1", udf1(df1.col("col1"))) > df2: org.apache.spark.sql.DataFrame = [col1: string, col2: decimal(10,0) ... > 2 more fields] > scala> df2.show > ++-++-+ > | col1| col2|col3| col4| > ++-++-+ > |201100024...|123456789| 10|test1| > ++-++-+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26318) Deprecate function merge in Row
[ https://issues.apache.org/jira/browse/SPARK-26318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725595#comment-16725595 ] ASF GitHub Bot commented on SPARK-26318: asfgit closed pull request #23271: [SPARK-26318][SQL] Deprecate Row.merge URL: https://github.com/apache/spark/pull/23271 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala index e12bf9616e2de..4f5af9ac80b10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala @@ -57,6 +57,7 @@ object Row { /** * Merge multiple rows into a single row, one after another. */ + @deprecated("This method is deprecated and will be removed in future versions.", "3.0.0") def merge(rows: Row*): Row = { // TODO: Improve the performance of this if used in performance critical part. new GenericRow(rows.flatMap(_.toSeq).toArray) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Deprecate function merge in Row > --- > > Key: SPARK-26318 > URL: https://issues.apache.org/jira/browse/SPARK-26318 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Liang Li >Assignee: Liang Li >Priority: Trivial > Fix For: 3.0.0 > > > Seems no one uses Row.merge. We might have to deprecate it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26318) Deprecate function merge in Row
[ https://issues.apache.org/jira/browse/SPARK-26318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon updated SPARK-26318: - Description: Seems no one uses Row.merge. We might have to deprecate it. (was: Enhance function merge performance in Row Like do 1 time Row.merge for input val row1 = Row("name", "work", 2314, "null", 1, ""), it need 108458 millisecond After do some enhancement, it only need 24967 millisecond) > Deprecate function merge in Row > --- > > Key: SPARK-26318 > URL: https://issues.apache.org/jira/browse/SPARK-26318 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Liang Li >Assignee: Liang Li >Priority: Trivial > Fix For: 3.0.0 > > > Seems no one uses Row.merge. We might have to deprecate it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26318) Deprecate function merge in Row
[ https://issues.apache.org/jira/browse/SPARK-26318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-26318. -- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23271 [https://github.com/apache/spark/pull/23271] > Deprecate function merge in Row > --- > > Key: SPARK-26318 > URL: https://issues.apache.org/jira/browse/SPARK-26318 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Liang Li >Assignee: Liang Li >Priority: Trivial > Fix For: 3.0.0 > > > Enhance function merge performance in Row > Like do 1 time Row.merge for input > val row1 = Row("name", "work", 2314, "null", 1, ""), it need 108458 > millisecond > After do some enhancement, it only need 24967 millisecond -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26318) Deprecate function merge in Row
[ https://issues.apache.org/jira/browse/SPARK-26318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-26318: Assignee: Liang Li > Deprecate function merge in Row > --- > > Key: SPARK-26318 > URL: https://issues.apache.org/jira/browse/SPARK-26318 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Liang Li >Assignee: Liang Li >Priority: Trivial > Fix For: 3.0.0 > > > Enhance function merge performance in Row > Like do 1 time Row.merge for input > val row1 = Row("name", "work", 2314, "null", 1, ""), it need 108458 > millisecond > After do some enhancement, it only need 24967 millisecond -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26409) SQLConf should be serializable in test sessions
[ https://issues.apache.org/jira/browse/SPARK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gengliang Wang updated SPARK-26409: --- Affects Version/s: 2.4.0 > SQLConf should be serializable in test sessions > --- > > Key: SPARK-26409 > URL: https://issues.apache.org/jira/browse/SPARK-26409 > Project: Spark > Issue Type: Improvement > Components: SQL, Tests >Affects Versions: 2.4.0, 3.0.0 >Reporter: Gengliang Wang >Priority: Major > > `SQLConf` is supposed to be serializable. However, currently it is not > serializable in `WithTestConf`. `WithTestConf` uses the method > `overrideConfs` in closure, while the classes which implements it > (`TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder`) are not > serializable. > Use a local variable to fix it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725533#comment-16725533 ] Fengyu Cao commented on SPARK-26389: console output force use temp checkpoint (I just want to test my code) and there is no way to disable checkpoint > temp checkpoint folder at executor should be deleted on graceful shutdown > - > > Key: SPARK-26389 > URL: https://issues.apache.org/jira/browse/SPARK-26389 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Fengyu Cao >Priority: Major > > {{spark-submit --master mesos:// -conf > spark.streaming.stopGracefullyOnShutdown=true framework>}} > CTRL-C, framework shutdown > {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = > f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = > 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error > org.apache.spark.SparkException: Writing job aborted.}} > {{/tmp/temporary- on executor not deleted due to > org.apache.spark.SparkException: Writing job aborted., and this temp > checkpoint can't used to recovery.}} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25271) Creating parquet table with all the column null throws exception
[ https://issues.apache.org/jira/browse/SPARK-25271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725532#comment-16725532 ] ASF GitHub Bot commented on SPARK-25271: asfgit closed pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use data source if it is convertible URL: https://github.com/apache/spark/pull/22514 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index e1faecedd20ed..096481f68275d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -820,6 +820,14 @@ object DDLUtils { table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER } + def readHiveTable(table: CatalogTable): HiveTableRelation = { +HiveTableRelation( + table, + // Hive table columns are always nullable. + table.dataSchema.asNullable.toAttributes, + table.partitionSchema.asNullable.toAttributes) + } + /** * Throws a standard error for actions that require partitionProvider = hive. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index b304e2da6e1cf..b5cf8c9515bfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -244,27 +244,19 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] }) } - private def readHiveTable(table: CatalogTable): LogicalPlan = { -HiveTableRelation( - table, - // Hive table columns are always nullable. - table.dataSchema.asNullable.toAttributes, - table.partitionSchema.asNullable.toAttributes) - } - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => i.copy(table = readDataSourceTable(tableMeta)) case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) => - i.copy(table = readHiveTable(tableMeta)) + i.copy(table = DDLUtils.readHiveTable(tableMeta)) case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) => readDataSourceTable(tableMeta) case UnresolvedCatalogRelation(tableMeta) => - readHiveTable(tableMeta) + DDLUtils.readHiveTable(tableMeta) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 5823548a8063c..03f4b8d83e353 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import java.util.Locale + import scala.util.control.NonFatal import com.google.common.util.concurrent.Striped @@ -29,6 +31,8 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._ import org.apache.spark.sql.types._ @@ -113,7 +117,44 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } - def convertToLogicalRelation( + // Return true for Apache ORC and Hive ORC-related configuration names. + // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. + private def isOrcProperty(key: String) = +key.startsWith("orc.") || key.contains(".orc.") + + private def isParquetProperty(key: String) = +key.startsWith("parquet.") || key.contains(".parquet.") + + def convert(relation: HiveTableRelation): LogicalRelation = { +val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) + +// Consider table and storage properties. For properties existing in both sides, storage +// properties will supersede table properties. +
[jira] [Assigned] (SPARK-25271) Creating parquet table with all the column null throws exception
[ https://issues.apache.org/jira/browse/SPARK-25271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-25271: --- Assignee: Liang-Chi Hsieh > Creating parquet table with all the column null throws exception > > > Key: SPARK-25271 > URL: https://issues.apache.org/jira/browse/SPARK-25271 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: Shivu Sondur >Assignee: Liang-Chi Hsieh >Priority: Critical > Fix For: 3.0.0 > > Attachments: image-2018-09-07-09-12-34-944.png, > image-2018-09-07-09-29-33-370.png, image-2018-09-07-09-29-52-899.png, > image-2018-09-07-09-32-43-892.png, image-2018-09-07-09-33-03-095.png > > > {code:java} > 1)cat /data/parquet.dat > 1$abc2$pqr:3$xyz > null{code} > > {code:java} > 2)spark.sql("create table vp_reader_temp (projects map) ROW > FORMAT DELIMITED FIELDS TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY ':' > MAP KEYS TERMINATED BY '$'") > {code} > {code:java} > 3)spark.sql(" > LOAD DATA LOCAL INPATH '/data/parquet.dat' INTO TABLE vp_reader_temp") > {code} > {code:java} > 4)spark.sql("create table vp_reader STORED AS PARQUET as select * from > vp_reader_temp") > {code} > *Result :* Throwing exception (Working fine with spark 2.2.1) > {code:java} > java.lang.RuntimeException: Parquet record is malformed: empty fields are > illegal, the field should be ommited completely instead > at > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:64) > at > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59) > at > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:123) > at > org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:180) > at > org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:46) > at > org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:112) > at > org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:125) > at > org.apache.spark.sql.hive.execution.HiveOutputWriter.write(HiveFileFormat.scala:149) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:406) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:283) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:281) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1438) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:286) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:211) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:210) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:349) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.parquet.io.ParquetEncodingException: empty fields are > illegal, the field should be ommited completely instead > at > org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:320) > at > org.apache.parquet.io.RecordConsumerLoggingWrapper.endField(RecordConsumerLoggingWrapper.java:165) > at > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeMap(DataWritableWriter.java:241) > at > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeValue(DataWritableWriter.java:116) > at > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeGroupFields(DataWritableWriter.java:89) > at > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:60) >
[jira] [Resolved] (SPARK-25271) Creating parquet table with all the column null throws exception
[ https://issues.apache.org/jira/browse/SPARK-25271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-25271. - Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 22514 [https://github.com/apache/spark/pull/22514] > Creating parquet table with all the column null throws exception > > > Key: SPARK-25271 > URL: https://issues.apache.org/jira/browse/SPARK-25271 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: Shivu Sondur >Assignee: Liang-Chi Hsieh >Priority: Critical > Fix For: 3.0.0 > > Attachments: image-2018-09-07-09-12-34-944.png, > image-2018-09-07-09-29-33-370.png, image-2018-09-07-09-29-52-899.png, > image-2018-09-07-09-32-43-892.png, image-2018-09-07-09-33-03-095.png > > > {code:java} > 1)cat /data/parquet.dat > 1$abc2$pqr:3$xyz > null{code} > > {code:java} > 2)spark.sql("create table vp_reader_temp (projects map) ROW > FORMAT DELIMITED FIELDS TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY ':' > MAP KEYS TERMINATED BY '$'") > {code} > {code:java} > 3)spark.sql(" > LOAD DATA LOCAL INPATH '/data/parquet.dat' INTO TABLE vp_reader_temp") > {code} > {code:java} > 4)spark.sql("create table vp_reader STORED AS PARQUET as select * from > vp_reader_temp") > {code} > *Result :* Throwing exception (Working fine with spark 2.2.1) > {code:java} > java.lang.RuntimeException: Parquet record is malformed: empty fields are > illegal, the field should be ommited completely instead > at > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:64) > at > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59) > at > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:123) > at > org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:180) > at > org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:46) > at > org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:112) > at > org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:125) > at > org.apache.spark.sql.hive.execution.HiveOutputWriter.write(HiveFileFormat.scala:149) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:406) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:283) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:281) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1438) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:286) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:211) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:210) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:349) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.parquet.io.ParquetEncodingException: empty fields are > illegal, the field should be ommited completely instead > at > org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:320) > at > org.apache.parquet.io.RecordConsumerLoggingWrapper.endField(RecordConsumerLoggingWrapper.java:165) > at > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeMap(DataWritableWriter.java:241) > at > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeValue(DataWritableWriter.java:116) > at > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeGroupFields(DataWritableWriter.java:89) > at >
[jira] [Comment Edited] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725528#comment-16725528 ] Fengyu Cao edited comment on SPARK-26389 at 12/20/18 2:45 AM: -- thanks for reply Two scenarios: # {{temp checkpoint dir /tmp/temporary- on worker node}} # framework restart # {{temp checkpoint dir now /tmp/temporary- (/tmp/temporary- can't used to recovery and should be deleted)}} # {{temp checkpoint dir /tmp/temporary- on worker node}} # executor stop in some reason # executor start on another worker nodes (/tmp/temporary- can't used to recovery either) Maybe temp checkpoint dir should be deleted on JVM stop? {quote}spark.streaming.stopGracefullyOnShutdown is a DStreams parameter and not Structured Streaming one. {quote} sorry, I didn't notice this. was (Author: camper42): thanks for reply Two scenarios: # {{temp checkpoint dir /tmp/temporary-}} # framework restart # {{temp checkpoint dir now /tmp/temporary- (/tmp/temporary- can't used to recovery and should be deleted)}} # {{temp checkpoint dir /tmp/temporary-}} # executor stop in some reason # executor start on another worker nodes (/tmp/temporary- can't used to recovery either) Maybe temp checkpoint dir should be deleted on JVM stop? {quote}spark.streaming.stopGracefullyOnShutdown is a DStreams parameter and not Structured Streaming one. {quote} sorry, I didn't notice this. > temp checkpoint folder at executor should be deleted on graceful shutdown > - > > Key: SPARK-26389 > URL: https://issues.apache.org/jira/browse/SPARK-26389 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Fengyu Cao >Priority: Major > > {{spark-submit --master mesos:// -conf > spark.streaming.stopGracefullyOnShutdown=true framework>}} > CTRL-C, framework shutdown > {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = > f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = > 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error > org.apache.spark.SparkException: Writing job aborted.}} > {{/tmp/temporary- on executor not deleted due to > org.apache.spark.SparkException: Writing job aborted., and this temp > checkpoint can't used to recovery.}} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26262) Runs SQLQueryTestSuite on mixed config sets: WHOLESTAGE_CODEGEN_ENABLED and CODEGEN_FACTORY_MODE
[ https://issues.apache.org/jira/browse/SPARK-26262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725530#comment-16725530 ] ASF GitHub Bot commented on SPARK-26262: asfgit closed pull request #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed config sets: WHOLESTAGE_CODEGEN_ENABLED and CODEGEN_FACTORY_MODE URL: https://github.com/apache/spark/pull/23213 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql index ec263ea70bd4a..7e81ff1aba37b 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql @@ -141,8 +141,3 @@ SELECT every("true"); SELECT k, v, every(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; SELECT k, v, some(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; SELECT k, v, any(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; - --- simple explain of queries having every/some/any agregates. Optimized --- plan should show the rewritten aggregate expression. -EXPLAIN EXTENDED SELECT k, every(v), some(v), any(v) FROM test_agg GROUP BY k; - diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql index 41d316444ed6b..b3ec956cd178e 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql @@ -49,6 +49,3 @@ select * from values ("one", count(1)), ("two", 2) as data(a, b); -- string to timestamp select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991-12-06 01:00:00.0'), timestamp('1991-12-06 12:00:00.0'))) as data(a, b); - --- cross-join inline tables -EXPLAIN EXTENDED SELECT * FROM VALUES ('one', 1), ('three', null) CROSS JOIN VALUES ('one', 1), ('three', null); diff --git a/sql/core/src/test/resources/sql-tests/inputs/operators.sql b/sql/core/src/test/resources/sql-tests/inputs/operators.sql index 37f9cd44da7f2..ba14789d48db6 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/operators.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/operators.sql @@ -29,27 +29,6 @@ select 2 * 5; select 5 % 3; select pmod(-7, 3); --- check operator precedence. --- We follow Oracle operator precedence in the table below that lists the levels of precedence --- among SQL operators from high to low: --- --- Operator Operation --- --- +, - identity, negation --- *, / multiplication, division --- +, -, || addition, subtraction, concatenation --- =, !=, <, >, <=, >=, IS NULL, LIKE, BETWEEN, IN comparison --- NOT exponentiation, logical negation --- AND conjunction --- ORdisjunction --- -explain select 'a' || 1 + 2; -explain select 1 - 2 || 'b'; -explain select 2 * 4 + 3 || 'b'; -explain select 3 + 1 || 'a' || 4 / 2; -explain select 1 == 1 OR 'a' || 'b' == 'ab'; -explain select 'a' || 'c' == 'ac' AND 2 == 3; - -- math functions select cot(1); select cot(null); diff --git a/sql/core/src/test/resources/sql-tests/inputs/sql-compatibility-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/sql-compatibility-functions.sql index f1461032065ad..1ae49c8bfc76a 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/sql-compatibility-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/sql-compatibility-functions.sql @@ -12,11 +12,6 @@ SELECT nullif(1, 2.1d), nullif(1, 1.0d); SELECT nvl(1, 2.1d), nvl(null, 2.1d); SELECT nvl2(null, 1, 2.1d), nvl2('n', 1, 2.1d); --- explain for these functions; use range to avoid constant folding -explain extended -select ifnull(id, 'x'), nullif(id, 'x'), nvl(id, 'x'), nvl2(id, 'x', 'y') -from range(2); - -- SPARK-16730 cast alias functions for Hive compatibility SELECT boolean(1), tinyint(1), smallint(1), int(1), bigint(1); SELECT float(1), double(1), decimal(1); diff --git a/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql index 2effb43183d75..fbc231627e36f 100644 ---
[jira] [Resolved] (SPARK-26262) Runs SQLQueryTestSuite on mixed config sets: WHOLESTAGE_CODEGEN_ENABLED and CODEGEN_FACTORY_MODE
[ https://issues.apache.org/jira/browse/SPARK-26262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-26262. - Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23213 [https://github.com/apache/spark/pull/23213] > Runs SQLQueryTestSuite on mixed config sets: WHOLESTAGE_CODEGEN_ENABLED and > CODEGEN_FACTORY_MODE > > > Key: SPARK-26262 > URL: https://issues.apache.org/jira/browse/SPARK-26262 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Takeshi Yamamuro >Assignee: Takeshi Yamamuro >Priority: Minor > Fix For: 3.0.0 > > > For better test coverage, we need to run `SQLQueryTestSuite` on 4 mixed > config sets: > 1. WHOLESTAGE_CODEGEN_ENABLED=true, CODEGEN_FACTORY_MODE=CODEGEN_ONLY > 2. WHOLESTAGE_CODEGEN_ENABLED=false, CODEGEN_FACTORY_MODE=CODEGEN_ONLY > 3. WHOLESTAGE_CODEGEN_ENABLED=true, CODEGEN_FACTORY_MODE=NO_CODEGEN > 4. WHOLESTAGE_CODEGEN_ENABLED=false, CODEGEN_FACTORY_MODE=NO_CODEGEN -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725528#comment-16725528 ] Fengyu Cao edited comment on SPARK-26389 at 12/20/18 2:44 AM: -- thanks for reply Two scenarios: # {{temp checkpoint dir /tmp/temporary-}} # framework restart # {{temp checkpoint dir now /tmp/temporary- (/tmp/temporary- can't used to recovery and should be deleted)}} # {{temp checkpoint dir /tmp/temporary-}} # executor stop in some reason # executor start on another worker nodes (/tmp/temporary- can't used to recovery either) Maybe temp checkpoint dir should be deleted on JVM stop? {quote}spark.streaming.stopGracefullyOnShutdown is a DStreams parameter and not Structured Streaming one. {quote} sorry, I didn't notice this. was (Author: camper42): thanks for reply Two scenarios: # {{temp checkpoint dir /tmp/temporary-}} # framework restart # {{temp checkpoint dir now /tmp/temporary- (/tmp/temporary- can't used to recovery and should be deleted)}} # {{temp checkpoint dir /tmp/temporary-}} # executor stop in some reason # executor start on another worker nodes (/tmp/temporary- can't used to recovery either) May be temp checkpoint dir should be deleted on JVM stop? {quote}spark.streaming.stopGracefullyOnShutdown is a DStreams parameter and not Structured Streaming one. {quote} sorry, I didn't notice this. > temp checkpoint folder at executor should be deleted on graceful shutdown > - > > Key: SPARK-26389 > URL: https://issues.apache.org/jira/browse/SPARK-26389 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Fengyu Cao >Priority: Major > > {{spark-submit --master mesos:// -conf > spark.streaming.stopGracefullyOnShutdown=true framework>}} > CTRL-C, framework shutdown > {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = > f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = > 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error > org.apache.spark.SparkException: Writing job aborted.}} > {{/tmp/temporary- on executor not deleted due to > org.apache.spark.SparkException: Writing job aborted., and this temp > checkpoint can't used to recovery.}} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26262) Runs SQLQueryTestSuite on mixed config sets: WHOLESTAGE_CODEGEN_ENABLED and CODEGEN_FACTORY_MODE
[ https://issues.apache.org/jira/browse/SPARK-26262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-26262: --- Assignee: Takeshi Yamamuro > Runs SQLQueryTestSuite on mixed config sets: WHOLESTAGE_CODEGEN_ENABLED and > CODEGEN_FACTORY_MODE > > > Key: SPARK-26262 > URL: https://issues.apache.org/jira/browse/SPARK-26262 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Takeshi Yamamuro >Assignee: Takeshi Yamamuro >Priority: Minor > Fix For: 3.0.0 > > > For better test coverage, we need to run `SQLQueryTestSuite` on 4 mixed > config sets: > 1. WHOLESTAGE_CODEGEN_ENABLED=true, CODEGEN_FACTORY_MODE=CODEGEN_ONLY > 2. WHOLESTAGE_CODEGEN_ENABLED=false, CODEGEN_FACTORY_MODE=CODEGEN_ONLY > 3. WHOLESTAGE_CODEGEN_ENABLED=true, CODEGEN_FACTORY_MODE=NO_CODEGEN > 4. WHOLESTAGE_CODEGEN_ENABLED=false, CODEGEN_FACTORY_MODE=NO_CODEGEN -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725528#comment-16725528 ] Fengyu Cao commented on SPARK-26389: thanks for reply Two scenarios: # {{temp checkpoint dir /tmp/temporary-}} # framework restart # {{temp checkpoint dir now /tmp/temporary- (/tmp/temporary- can't used to recovery and should be deleted)}} # {{temp checkpoint dir /tmp/temporary-}} # executor stop in some reason # executor start on another worker nodes (/tmp/temporary- can't used to recovery either) May be temp checkpoint dir should be deleted on JVM stop? {quote}spark.streaming.stopGracefullyOnShutdown is a DStreams parameter and not Structured Streaming one. {quote} sorry, I didn't notice this. > temp checkpoint folder at executor should be deleted on graceful shutdown > - > > Key: SPARK-26389 > URL: https://issues.apache.org/jira/browse/SPARK-26389 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Fengyu Cao >Priority: Major > > {{spark-submit --master mesos:// -conf > spark.streaming.stopGracefullyOnShutdown=true framework>}} > CTRL-C, framework shutdown > {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = > f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = > 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error > org.apache.spark.SparkException: Writing job aborted.}} > {{/tmp/temporary- on executor not deleted due to > org.apache.spark.SparkException: Writing job aborted., and this temp > checkpoint can't used to recovery.}} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26417) Make comments for states available for logging
[ https://issues.apache.org/jira/browse/SPARK-26417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evgenii Lartcev updated SPARK-26417: Description: When add an extra `SparkListener`, it would be very useful (in production applications) to get the description of a state. For now it only available as comments in code. As suggestion it could be moved into constructor as well as finalization flag at enum org.apache.spark.launcher.SparkAppHandle.State (was: When add an extra `SparkListener`, it would be very useful (in production applications) to get the description of a state. Now it only available as comments in code. As suggestion it could be moved into constructor as well as finalization flag at enum org.apache.spark.launcher.SparkAppHandle.State) > Make comments for states available for logging > -- > > Key: SPARK-26417 > URL: https://issues.apache.org/jira/browse/SPARK-26417 > Project: Spark > Issue Type: Wish > Components: Java API, Spark Core >Affects Versions: 2.3.2, 2.4.0 >Reporter: Evgenii Lartcev >Priority: Minor > > When add an extra `SparkListener`, it would be very useful (in production > applications) to get the description of a state. For now it only available as > comments in code. As suggestion it could be moved into constructor as well as > finalization flag at enum org.apache.spark.launcher.SparkAppHandle.State -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26417) Make comments for states available for logging
[ https://issues.apache.org/jira/browse/SPARK-26417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evgenii Lartcev updated SPARK-26417: Description: When add an extra `SparkListener`, it would be very useful (in production applications) to get the description of a state. Now it only available as comments in code. As suggestion it could be moved into constructor as well as finalization flag at enum org.apache.spark.launcher.SparkAppHandle.State (was: When add an extra `SparkListener`, it would be very useful (in production applications) to get the description of a state. Now it only available in comments. As suggestion it could be moved into constructor as well as finalization flag at enum org.apache.spark.launcher.SparkAppHandle.State) > Make comments for states available for logging > -- > > Key: SPARK-26417 > URL: https://issues.apache.org/jira/browse/SPARK-26417 > Project: Spark > Issue Type: Wish > Components: Java API, Spark Core >Affects Versions: 2.3.2, 2.4.0 >Reporter: Evgenii Lartcev >Priority: Minor > > When add an extra `SparkListener`, it would be very useful (in production > applications) to get the description of a state. Now it only available as > comments in code. As suggestion it could be moved into constructor as well as > finalization flag at enum org.apache.spark.launcher.SparkAppHandle.State -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26417) Make comments for states available for logging
[ https://issues.apache.org/jira/browse/SPARK-26417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evgenii Lartcev updated SPARK-26417: Description: When add an extra `SparkListener`, it would be very useful (in production applications) to get the description of a state. Now it only available in comments. As suggestion it could be moved into constructor as well as finalization flag at enum org.apache.spark.launcher.SparkAppHandle.State (was: When add an extra `SparkListener`, it would be very useful in production mode to get the description of a state. Now it only available in comments. As suggestion it could be moved into constructor as well as finalization flag at enum org.apache.spark.launcher.SparkAppHandle.State) > Make comments for states available for logging > -- > > Key: SPARK-26417 > URL: https://issues.apache.org/jira/browse/SPARK-26417 > Project: Spark > Issue Type: Wish > Components: Java API, Spark Core >Affects Versions: 2.3.2, 2.4.0 >Reporter: Evgenii Lartcev >Priority: Minor > > When add an extra `SparkListener`, it would be very useful (in production > applications) to get the description of a state. Now it only available in > comments. As suggestion it could be moved into constructor as well as > finalization flag at enum org.apache.spark.launcher.SparkAppHandle.State -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26417) Make comments for states available for logging
[ https://issues.apache.org/jira/browse/SPARK-26417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evgenii Lartcev updated SPARK-26417: Affects Version/s: 2.3.2 > Make comments for states available for logging > -- > > Key: SPARK-26417 > URL: https://issues.apache.org/jira/browse/SPARK-26417 > Project: Spark > Issue Type: Wish > Components: Java API, Spark Core >Affects Versions: 2.3.2, 2.4.0 >Reporter: Evgenii Lartcev >Priority: Minor > > When add an extra `SparkListener`, it would be very useful in production mode > to get the description of a state. Now it only available in comments. As > suggestion it could be moved into constructor as well as finalization flag at > enum org.apache.spark.launcher.SparkAppHandle.State -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26417) Make comments for states available for logging
[ https://issues.apache.org/jira/browse/SPARK-26417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evgenii Lartcev updated SPARK-26417: Description: When add an extra `SparkListener`, it would be very useful in production mode to get the description of a state. Now it only available in comments. As suggestion it could be moved into constructor as well as finalization flag at enum org.apache.spark.launcher.SparkAppHandle.State (was: When add an extra `SparkListener`, it would be very useful in production mode to get the description of a state. Now it only available in comments. As suggestion it could be moved into constructor as well as finalization flag.) > Make comments for states available for logging > -- > > Key: SPARK-26417 > URL: https://issues.apache.org/jira/browse/SPARK-26417 > Project: Spark > Issue Type: Wish > Components: Java API, Spark Core >Affects Versions: 2.3.2, 2.4.0 >Reporter: Evgenii Lartcev >Priority: Minor > > When add an extra `SparkListener`, it would be very useful in production mode > to get the description of a state. Now it only available in comments. As > suggestion it could be moved into constructor as well as finalization flag at > enum org.apache.spark.launcher.SparkAppHandle.State -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26417) Make comments for states available for logging
Evgenii Lartcev created SPARK-26417: --- Summary: Make comments for states available for logging Key: SPARK-26417 URL: https://issues.apache.org/jira/browse/SPARK-26417 Project: Spark Issue Type: Wish Components: Java API, Spark Core Affects Versions: 2.4.0 Reporter: Evgenii Lartcev When add an extra `SparkListener`, it would be very useful in production mode to get the description of a state. Now it only available in comments. As suggestion it could be moved into constructor as well as finalization flag. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26416) Refactor `ColumnPruning` from `Optimizer.scala` to `ColumnPruning.scala`
DB Tsai created SPARK-26416: --- Summary: Refactor `ColumnPruning` from `Optimizer.scala` to `ColumnPruning.scala` Key: SPARK-26416 URL: https://issues.apache.org/jira/browse/SPARK-26416 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: DB Tsai Assignee: DB Tsai Fix For: 3.0.0 As `Optimizer.scala` becomes bigger and bigger, it's hard to add new rules and maintain them. We are refactoring out `ColumnPruning` from `Optimizer.scala` to `ColumnPruning.scala` so it's easier to add new logics in `ColumnPruning`. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26374) Support new date/timestamp parser in HadoopFsRelationTest
[ https://issues.apache.org/jira/browse/SPARK-26374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725422#comment-16725422 ] Maxim Gekk commented on SPARK-26374: Most likely this is related ticket: https://bugs.openjdk.java.net/browse/JDK-8181465 > Support new date/timestamp parser in HadoopFsRelationTest > - > > Key: SPARK-26374 > URL: https://issues.apache.org/jira/browse/SPARK-26374 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 2.4.0 >Reporter: Maxim Gekk >Priority: Minor > > The *test all data types* test uses the legacy parser for dates/timestamps > (https://github.com/apache/spark/pull/23196/files#diff-3986f801dd1335af3f300b006a03e987R129). > The test is passed on UTC timezone and new parser but fails on different > timezones > ([see|https://github.com/apache/spark/pull/23196#discussion_r241492360]): > {code} > == Correct Answer - 10 == == Spark Answer - 10 == > ... > ![10,1670-02-11 14:09:54.746] [10,1670-02-11 14:08:56.746] > == Correct Answer - 10 == == Spark Answer - 10 == > [1,6246-07-23 20:34:56.968] [1,6246-07-23 20:34:56.968] > ![2,0109-07-20 18:38:03.788] [2,0109-07-20 18:37:05.788] > {code} > The ticket aims to switching on new parser independently from timezones. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725418#comment-16725418 ] Gabor Somogyi commented on SPARK-26389: --- {quote}can't used to recovery{quote} What does this mean actually? >From use-case perspective: 1. if you want to recover why don't you just set a not temp checkpoint? 2. if you want to recover why do you want it to get deleted? > temp checkpoint folder at executor should be deleted on graceful shutdown > - > > Key: SPARK-26389 > URL: https://issues.apache.org/jira/browse/SPARK-26389 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Fengyu Cao >Priority: Major > > {{spark-submit --master mesos:// -conf > spark.streaming.stopGracefullyOnShutdown=true framework>}} > CTRL-C, framework shutdown > {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = > f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = > 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error > org.apache.spark.SparkException: Writing job aborted.}} > {{/tmp/temporary- on executor not deleted due to > org.apache.spark.SparkException: Writing job aborted., and this temp > checkpoint can't used to recovery.}} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26415) Mark StreamSinkProvider and StreamSourceProvider as stable
[ https://issues.apache.org/jira/browse/SPARK-26415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725416#comment-16725416 ] Grant Henke commented on SPARK-26415: - Apologies, feel free to adjust. I set the versions I think would be appropriate given the interface has been the same since those versions/branches were released. > Mark StreamSinkProvider and StreamSourceProvider as stable > -- > > Key: SPARK-26415 > URL: https://issues.apache.org/jira/browse/SPARK-26415 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Grant Henke >Priority: Major > > This change marks the StreamSinkProvider and StreamSourceProvider > interfaces as stable so that it can be relied on for compatibility for all of > Spark 2.x. > These interfaces have been available since Spark 2.0.0 and unchanged > since Spark 2.1.0. Additionally the Kafka integration has been using it > since Spark 2.1.0. > Because structured streaming general availability was announced in > Spark 2.2.0, I suspect there are other third-party integrations using it > already as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725413#comment-16725413 ] Gabor Somogyi commented on SPARK-26389: --- spark.streaming.stopGracefullyOnShutdown is a DStreams parameter and not Structured Streaming one. > temp checkpoint folder at executor should be deleted on graceful shutdown > - > > Key: SPARK-26389 > URL: https://issues.apache.org/jira/browse/SPARK-26389 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Fengyu Cao >Priority: Major > > {{spark-submit --master mesos:// -conf > spark.streaming.stopGracefullyOnShutdown=true framework>}} > CTRL-C, framework shutdown > {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = > f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = > 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error > org.apache.spark.SparkException: Writing job aborted.}} > {{/tmp/temporary- on executor not deleted due to > org.apache.spark.SparkException: Writing job aborted., and this temp > checkpoint can't used to recovery.}} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26415) Mark StreamSinkProvider and StreamSourceProvider as stable
[ https://issues.apache.org/jira/browse/SPARK-26415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725395#comment-16725395 ] Gabor Somogyi edited comment on SPARK-26415 at 12/19/18 10:30 PM: -- One minor thing: AFAIK target version should be set by committers. was (Author: gsomogyi): One minor thing: AFAIK targer version should be set by committers. > Mark StreamSinkProvider and StreamSourceProvider as stable > -- > > Key: SPARK-26415 > URL: https://issues.apache.org/jira/browse/SPARK-26415 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Grant Henke >Priority: Major > > This change marks the StreamSinkProvider and StreamSourceProvider > interfaces as stable so that it can be relied on for compatibility for all of > Spark 2.x. > These interfaces have been available since Spark 2.0.0 and unchanged > since Spark 2.1.0. Additionally the Kafka integration has been using it > since Spark 2.1.0. > Because structured streaming general availability was announced in > Spark 2.2.0, I suspect there are other third-party integrations using it > already as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26415) Mark StreamSinkProvider and StreamSourceProvider as stable
[ https://issues.apache.org/jira/browse/SPARK-26415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725395#comment-16725395 ] Gabor Somogyi commented on SPARK-26415: --- One minor thing: AFAIK targer version should be set by committers. > Mark StreamSinkProvider and StreamSourceProvider as stable > -- > > Key: SPARK-26415 > URL: https://issues.apache.org/jira/browse/SPARK-26415 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Grant Henke >Priority: Major > > This change marks the StreamSinkProvider and StreamSourceProvider > interfaces as stable so that it can be relied on for compatibility for all of > Spark 2.x. > These interfaces have been available since Spark 2.0.0 and unchanged > since Spark 2.1.0. Additionally the Kafka integration has been using it > since Spark 2.1.0. > Because structured streaming general availability was announced in > Spark 2.2.0, I suspect there are other third-party integrations using it > already as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26396) Kafka consumer cache overflow since 2.4.x
[ https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725393#comment-16725393 ] Gabor Somogyi commented on SPARK-26396: --- Should be. Related the jira the described use-case is not how it should be designed. Unless you've further issues I would like to close it with info provided. > Kafka consumer cache overflow since 2.4.x > - > > Key: SPARK-26396 > URL: https://issues.apache.org/jira/browse/SPARK-26396 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark 2.4 standalone client mode >Reporter: Kaspar Tint >Priority: Major > > We are experiencing an issue where the Kafka consumer cache seems to overflow > constantly upon starting the application. This issue appeared after upgrading > to Spark 2.4. > We would get constant warnings like this: > {code:java} > 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43) > {code} > This application is running 4 different Spark Structured Streaming queries > against the same Kafka topic that has 90 partitions. We used to run it with > just the default settings so it defaulted to cache size 64 on Spark 2.3 but > now we tried to put it to 180 or 360. With 360 we will have a lot less noise > about the overflow but resource need will increase substantially. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26415) Mark StreamSinkProvider and StreamSourceProvider as stable
[ https://issues.apache.org/jira/browse/SPARK-26415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725376#comment-16725376 ] Apache Spark commented on SPARK-26415: -- User 'granthenke' has created a pull request for this issue: https://github.com/apache/spark/pull/23354 > Mark StreamSinkProvider and StreamSourceProvider as stable > -- > > Key: SPARK-26415 > URL: https://issues.apache.org/jira/browse/SPARK-26415 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Grant Henke >Priority: Major > > This change marks the StreamSinkProvider and StreamSourceProvider > interfaces as stable so that it can be relied on for compatibility for all of > Spark 2.x. > These interfaces have been available since Spark 2.0.0 and unchanged > since Spark 2.1.0. Additionally the Kafka integration has been using it > since Spark 2.1.0. > Because structured streaming general availability was announced in > Spark 2.2.0, I suspect there are other third-party integrations using it > already as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26415) Mark StreamSinkProvider and StreamSourceProvider as stable
[ https://issues.apache.org/jira/browse/SPARK-26415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26415: Assignee: Apache Spark > Mark StreamSinkProvider and StreamSourceProvider as stable > -- > > Key: SPARK-26415 > URL: https://issues.apache.org/jira/browse/SPARK-26415 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Grant Henke >Assignee: Apache Spark >Priority: Major > > This change marks the StreamSinkProvider and StreamSourceProvider > interfaces as stable so that it can be relied on for compatibility for all of > Spark 2.x. > These interfaces have been available since Spark 2.0.0 and unchanged > since Spark 2.1.0. Additionally the Kafka integration has been using it > since Spark 2.1.0. > Because structured streaming general availability was announced in > Spark 2.2.0, I suspect there are other third-party integrations using it > already as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26415) Mark StreamSinkProvider and StreamSourceProvider as stable
[ https://issues.apache.org/jira/browse/SPARK-26415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26415: Assignee: (was: Apache Spark) > Mark StreamSinkProvider and StreamSourceProvider as stable > -- > > Key: SPARK-26415 > URL: https://issues.apache.org/jira/browse/SPARK-26415 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Grant Henke >Priority: Major > > This change marks the StreamSinkProvider and StreamSourceProvider > interfaces as stable so that it can be relied on for compatibility for all of > Spark 2.x. > These interfaces have been available since Spark 2.0.0 and unchanged > since Spark 2.1.0. Additionally the Kafka integration has been using it > since Spark 2.1.0. > Because structured streaming general availability was announced in > Spark 2.2.0, I suspect there are other third-party integrations using it > already as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26415) Mark StreamSinkProvider and StreamSourceProvider as stable
Grant Henke created SPARK-26415: --- Summary: Mark StreamSinkProvider and StreamSourceProvider as stable Key: SPARK-26415 URL: https://issues.apache.org/jira/browse/SPARK-26415 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.2.0 Reporter: Grant Henke This change marks the StreamSinkProvider and StreamSourceProvider interfaces as stable so that it can be relied on for compatibility for all of Spark 2.x. These interfaces have been available since Spark 2.0.0 and unchanged since Spark 2.1.0. Additionally the Kafka integration has been using it since Spark 2.1.0. Because structured streaming general availability was announced in Spark 2.2.0, I suspect there are other third-party integrations using it already as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26414) Race between SparkContext and YARN AM can cause NPE in UI setup code
Marcelo Vanzin created SPARK-26414: -- Summary: Race between SparkContext and YARN AM can cause NPE in UI setup code Key: SPARK-26414 URL: https://issues.apache.org/jira/browse/SPARK-26414 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 2.4.0 Reporter: Marcelo Vanzin There's a super narrow race between the SparkContext and the AM startup code: - SC starts the AM and waits for it to go into running state - AM goes into running state, unblocking SC - AM sends AmIpFilter config to SC, adds the filter to the list and then the filter configs - unblocked SC is in the middle of setting up the UI and sees only the filter, but not the configs Then you get this: {noformat} ERROR org.apache.spark.SparkContext - Error initializing SparkContext. java.lang.NullPointerException at org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.init(AmIpFilter.java:81) at org.spark_project.jetty.servlet.FilterHolder.initialize(FilterHolder.java:139) at org.spark_project.jetty.servlet.ServletHandler.initialize(ServletHandler.java:881) at org.spark_project.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:349) at org.spark_project.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:778) at org.spark_project.jetty.servlet.ServletContextHandler.doStart(ServletContextHandler.java:262) at org.spark_project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) at org.apache.spark.ui.ServerInfo.addHandler(JettyUtils.scala:520) at org.apache.spark.ui.WebUI$$anonfun$attachHandler$1.apply(WebUI.scala:96) at org.apache.spark.ui.WebUI$$anonfun$attachHandler$1.apply(WebUI.scala:96) at scala.Option.foreach(Option.scala:257) at org.apache.spark.ui.WebUI.attachHandler(WebUI.scala:96) at org.apache.spark.SparkContext$$anonfun$22$$anonfun$apply$8.apply(SparkContext.scala:522) at org.apache.spark.SparkContext$$anonfun$22$$anonfun$apply$8.apply(SparkContext.scala:522) at scala.Option.foreach(Option.scala:257) {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21544) Test jar of some module should not install or deploy twice
[ https://issues.apache.org/jira/browse/SPARK-21544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725257#comment-16725257 ] Matt Foley commented on SPARK-21544: [~cane] , I'm having exactly the problem described in SPARK-21544 with regard to spark-streaming , in Spark 2.4.0. The changes in PR [#18745|https://github.com/apache/spark/pull/18745] fixed the problem with sql/catalyst and sql/core, but did not change streaming. Did you find a solution for it? Any help would be greatly appreciated. > Test jar of some module should not install or deploy twice > -- > > Key: SPARK-21544 > URL: https://issues.apache.org/jira/browse/SPARK-21544 > Project: Spark > Issue Type: Improvement > Components: Deploy >Affects Versions: 1.6.1, 2.1.0 >Reporter: zhoukang >Assignee: zhoukang >Priority: Minor > Fix For: 2.3.0 > > > For moudle below: > common/network-common > streaming > sql/core > sql/catalyst > tests.jar will install or deploy twice.Like: > {code:java} > [INFO] Installing > /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar > to > /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar > [DEBUG] Writing tracking file > /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/_remote.repositories > [DEBUG] Installing > org.apache.spark:spark-streaming_2.11:2.1.0-mdh2.1.0.1-SNAPSHOT/maven-metadata.xml > to > /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/maven-metadata-local.xml > [DEBUG] Installing org.apache.spark:spark-streaming_2.11/maven-metadata.xml > to > /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/maven-metadata-local.xml > [INFO] Installing > /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar > to > /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar > [DEBUG] Skipped re-installing > /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar > to > /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar, > seems unchanged > {code} > The reason is below: > {code:java} > [DEBUG] (f) artifact = > org.apache.spark:spark-streaming_2.11:jar:2.1.0-mdh2.1.0.1-SNAPSHOT > [DEBUG] (f) attachedArtifacts = > [org.apache.spark:spark-streaming_2.11:test-jar:tests:2.1.0-mdh2.1.0.1-SNAPSHOT, > org.apache.spark:spark-streaming_2.11:jar:tests:2.1.0-mdh2.1.0.1-SNAPSHOT, > org.apache.spark:spark > -streaming_2.11:java-source:sources:2.1.0-mdh2.1.0.1-SNAPSHOT, > org.apache.spark:spark-streaming_2.11:java-source:test-sources:2.1.0-mdh2.1.0.1-SNAPSHOT, > org.apache.spark:spark-streaming_2.11:javadoc:javadoc:2.1.0 > -mdh2.1.0.1-SNAPSHOT] > {code} > when executing 'mvn deploy' to nexus during release.I will fail since release > nexus can not be override. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26413) SPIP: RDD Arrow Support in Spark Core and PySpark
Richard Whitcomb created SPARK-26413: Summary: SPIP: RDD Arrow Support in Spark Core and PySpark Key: SPARK-26413 URL: https://issues.apache.org/jira/browse/SPARK-26413 Project: Spark Issue Type: Improvement Components: PySpark, Spark Core Affects Versions: 3.0.0 Reporter: Richard Whitcomb h2. Background and Motivation Arrow is becoming an standard interchange format for columnar Structured Data. This is already true in Spark with the use of arrow in the pandas udf functions in the dataframe API. However the current implementation of arrow in spark is limited to two use cases. * Pandas UDF that allows for operations on one or more columns in the DataFrame API. * Collect as Pandas which pulls back the entire dataset to the driver in a Pandas Dataframe. What is still hard however is making use of all of the columns in a Dataframe while staying distributed across the workers. The only way to do this currently is to drop down into RDDs and collect the rows into a dataframe. However pickling is very slow and the collecting is expensive. The proposal is to extend spark in a way that allows users to operate on an Arrow Table fully while still making use of Spark's underlying technology. Some examples of possibilities with this new API. * Pass the Arrow Table with Zero Copy to PyTorch for predictions. * Pass to Nvidia Rapids for an algorithm to be run on the GPU. * Distribute data across many GPUs making use of the new Barriers API. h2. Targets users and personas ML, Data Scientists, and future library authors.. h2. Goals * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table] * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark Dataframe * Open the possibilities to tighter integration between Arrow/Pandas/Spark especially at a library level. h2. Non-Goals * Not creating a new API but instead using existing APIs. h2. Proposed API changes h3. Data Objects case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch]) h3. Dataset.scala {code:java} // Converts a Dataset to an RDD of Arrow Tables // Each RDD row is an Interable of Arrow Batches. def arrowRDD: RDD[ArrowTable] // Utility Function to convert to RDD Arrow Table for PySpark private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]] {code} h3. RDD.scala {code:java} // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe // Converts RDD[ArrowTable] to an RDD of Rows def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code} h3. Serializers.py {code:java} # Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table. class ArrowSerializer(FramedSerializer) {code} h3. RDD.py {code} # New RDD Class that has an RDD[ArrowTable] behind it and uses the new ArrowSerializer instead of the normal Pickle Serializer class ArrowRDD(RDD){code} h3. Dataframe.py {code} // New Function that converts a pyspark dataframe into an ArrowRDD def arrow(self): {code} h2. Example API Usage h3. Pyspark {code} # Select a Single Column Using Pandas def map_table(arrow_table): import pyarrow as pa pdf = arrow_table.to_pandas() pdf = pdf[['email']] return pa.Table.from_pandas(pdf) # Convert to Arrow RDD, map over tables, convert back to dataframe df.arrow.map(map_table).dataframe {code} h3. Scala {code:java} // Find N Centroids using Cuda Rapids kMeans def runCuKmeans(table: ArrowTable, clusters: Int): ArrowTable // Convert Dataset[Row] to RDD[ArrowTable] and back to Dataset[Row] df.arrowRDD.map(table => runCuKmeans(table, N)).arrowToDataframe.show(10) {code} h2. Implementation Details As mentioned in the first section, the goal is to make it easier for Spark users to interact with Arrow tools and libraries. This however does come with some considerations from a Spark perspective. Arrow is column based instead of Row based. In the above API proposal of RDD[ArrowTable] each RDD row will in fact be a block of data. Another proposal in this regard is to introduce a new parameter to Spark called arrow.sql.execution.arrow.maxRecordsPerTable. The goal of this parameter is to decide how many records are included in a single Arrow Table. If set to -1 the entire partition will be included in the table else to that number. Within that number the normal blocking mechanisms of Arrow is used to include multiple batches. This is still dictated by arrowMaxRecordsPerBatch. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26402) Canonicalization on GetStructField
[ https://issues.apache.org/jira/browse/SPARK-26402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26402: Assignee: DB Tsai (was: Apache Spark) > Canonicalization on GetStructField > -- > > Key: SPARK-26402 > URL: https://issues.apache.org/jira/browse/SPARK-26402 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: DB Tsai >Assignee: DB Tsai >Priority: Major > > GetStructField with different optional names should be semantically equal. We > will use this as building block to compare the nested fields used in the > plans to be optimized by catalyst optimizer. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26402) Canonicalization on GetStructField
[ https://issues.apache.org/jira/browse/SPARK-26402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26402: Assignee: Apache Spark (was: DB Tsai) > Canonicalization on GetStructField > -- > > Key: SPARK-26402 > URL: https://issues.apache.org/jira/browse/SPARK-26402 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: DB Tsai >Assignee: Apache Spark >Priority: Major > > GetStructField with different optional names should be semantically equal. We > will use this as building block to compare the nested fields used in the > plans to be optimized by catalyst optimizer. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26402) Canonicalization on GetStructField
[ https://issues.apache.org/jira/browse/SPARK-26402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725256#comment-16725256 ] ASF GitHub Bot commented on SPARK-26402: dbtsai opened a new pull request #23353: [SPARK-26402][SQL] Canonicalization on GetStructField URL: https://github.com/apache/spark/pull/23353 ## What changes were proposed in this pull request? GetStructField with different optional names should be semantically equal. We will use this as building block to compare the nested fields used in the plans to be optimized by catalyst optimizer. ## How was this patch tested? New tests are added. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Canonicalization on GetStructField > -- > > Key: SPARK-26402 > URL: https://issues.apache.org/jira/browse/SPARK-26402 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: DB Tsai >Assignee: DB Tsai >Priority: Major > > GetStructField with different optional names should be semantically equal. We > will use this as building block to compare the nested fields used in the > plans to be optimized by catalyst optimizer. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26388) No support for "alter table .. replace columns" to drop columns
[ https://issues.apache.org/jira/browse/SPARK-26388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725244#comment-16725244 ] nirav patel commented on SPARK-26388: - updated description with requested detail and a sample. > No support for "alter table .. replace columns" to drop columns > --- > > Key: SPARK-26388 > URL: https://issues.apache.org/jira/browse/SPARK-26388 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1, 2.3.1, 2.3.2 >Reporter: nirav patel >Priority: Major > > Looks like hive {{replace columns}} is not working with spark 2.2.1 and 2.3.1 > > create table myschema.mytable(a int, b int, c int) > alter table myschema.mytable replace columns (a int,b int,d int) > > *Expected Behavior* > it should drop column c and add column d. > alter table... replace columns.. should work just as it works in hive. > It replaces existing columns with new ones. It delete if column is not > mentioned. > > here's the snippet of hive cli: > hive> desc mytable; > OK > a int > b int > c int > Time taken: 0.05 seconds, Fetched: 3 row(s) > hive> alter table mytable replace columns(a int, b int, d int); > OK > Time taken: 0.078 seconds > hive> desc mytable; > OK > a int > b int > d int > Time taken: 0.03 seconds, Fetched: 3 row(s) > > *Actual Result* > Exception in thread "main" > org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: > alter table replace columns > {{ADD COLUMNS}} works which seemed to previously reported and fixed as well: > https://issues.apache.org/jira/browse/SPARK-18893 > > Replace columns should be supported as well. afaik, that's the only way to > delete hive columns. > > > It supposed to work according to this docs: > > [https://docs.databricks.com/spark/latest/spark-sql/language-manual/alter-table-or-view.html#replace-columns] > > [https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#supported-hive-features] > > but it's throwing error for me on 2 different versions. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames for the entire partition
[ https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li updated SPARK-26412: Target Version/s: 3.0.0 > Allow Pandas UDF to take an iterator of pd.DataFrames for the entire partition > -- > > Key: SPARK-26412 > URL: https://issues.apache.org/jira/browse/SPARK-26412 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Priority: Major > > Pandas UDF is the ideal connection between PySpark and DL model inference > workload. However, user needs to load the model file first to make > predictions. It is common to see models of size ~100MB or bigger. If the > Pandas UDF execution is limited to batch scope, user need to repeatedly load > the same model for every batch in the same python worker process, which is > inefficient. I created this JIRA to discuss possible solutions. > Essentially we need to support "start()" and "finish()" besides "apply". We > can either provide those interfaces or simply provide users the iterator of > batches in pd.DataFrame and let user code handle it. > cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26388) No support for "alter table .. replace columns" to drop columns
[ https://issues.apache.org/jira/browse/SPARK-26388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nirav patel updated SPARK-26388: Description: Looks like hive {{replace columns}} is not working with spark 2.2.1 and 2.3.1 create table myschema.mytable(a int, b int, c int) alter table myschema.mytable replace columns (a int,b int,d int) *Expected Behavior* it should drop column c and add column d. alter table... replace columns.. should work just as it works in hive. It replaces existing columns with new ones. It delete if column is not mentioned. here's the snippet of hive cli: hive> desc mytable; OK a int b int c int Time taken: 0.05 seconds, Fetched: 3 row(s) hive> alter table mytable replace columns(a int, b int, d int); OK Time taken: 0.078 seconds hive> desc mytable; OK a int b int d int Time taken: 0.03 seconds, Fetched: 3 row(s) *Actual Result* Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: alter table replace columns {{ADD COLUMNS}} works which seemed to previously reported and fixed as well: https://issues.apache.org/jira/browse/SPARK-18893 Replace columns should be supported as well. afaik, that's the only way to delete hive columns. It supposed to work according to this docs: [https://docs.databricks.com/spark/latest/spark-sql/language-manual/alter-table-or-view.html#replace-columns] [https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#supported-hive-features] but it's throwing error for me on 2 different versions. was: Looks like hive {{replace columns}} is not working with spark 2.2.1 and 2.3.1 {{alterSchemaSql : alter table myschema.mytable replace columns (a int,b int,d int) Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: alter table replace columns(line 2, pos 6) }} {{ADD COLUMNS}} works which seemed to previously reported and fixed as well: https://issues.apache.org/jira/browse/SPARK-18893 Replace columns should be supported as well. afaik, that's the only way to delete hive columns. It supposed to work according to this docs: [https://docs.databricks.com/spark/latest/spark-sql/language-manual/alter-table-or-view.html#replace-columns] [https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#supported-hive-features] but it's throwing error for me on 2 different versions. > No support for "alter table .. replace columns" to drop columns > --- > > Key: SPARK-26388 > URL: https://issues.apache.org/jira/browse/SPARK-26388 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1, 2.3.1, 2.3.2 >Reporter: nirav patel >Priority: Major > > Looks like hive {{replace columns}} is not working with spark 2.2.1 and 2.3.1 > > create table myschema.mytable(a int, b int, c int) > alter table myschema.mytable replace columns (a int,b int,d int) > > *Expected Behavior* > it should drop column c and add column d. > alter table... replace columns.. should work just as it works in hive. > It replaces existing columns with new ones. It delete if column is not > mentioned. > > here's the snippet of hive cli: > hive> desc mytable; > OK > a int > b int > c int > Time taken: 0.05 seconds, Fetched: 3 row(s) > hive> alter table mytable replace columns(a int, b int, d int); > OK > Time taken: 0.078 seconds > hive> desc mytable; > OK > a int > b int > d int > Time taken: 0.03 seconds, Fetched: 3 row(s) > > *Actual Result* > Exception in thread "main" > org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: > alter table replace columns > {{ADD COLUMNS}} works which seemed to previously reported and fixed as well: > https://issues.apache.org/jira/browse/SPARK-18893 > > Replace columns should be supported as well. afaik, that's the only way to > delete hive columns. > > > It supposed to work according to this docs: > > [https://docs.databricks.com/spark/latest/spark-sql/language-manual/alter-table-or-view.html#replace-columns] > >
[jira] [Created] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames for the entire partition
Xiangrui Meng created SPARK-26412: - Summary: Allow Pandas UDF to take an iterator of pd.DataFrames for the entire partition Key: SPARK-26412 URL: https://issues.apache.org/jira/browse/SPARK-26412 Project: Spark Issue Type: New Feature Components: PySpark Affects Versions: 3.0.0 Reporter: Xiangrui Meng Pandas UDF is the ideal connection between PySpark and DL model inference workload. However, user needs to load the model file first to make predictions. It is common to see models of size ~100MB or bigger. If the Pandas UDF execution is limited to batch scope, user need to repeatedly load the same model for every batch in the same python worker process, which is inefficient. I created this JIRA to discuss possible solutions. Essentially we need to support "start()" and "finish()" besides "apply". We can either provide those interfaces or simply provide users the iterator of batches in pd.DataFrame and let user code handle it. cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26410) Support per Pandas UDF configuration
[ https://issues.apache.org/jira/browse/SPARK-26410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiangrui Meng updated SPARK-26410: -- Description: We use a "maxRecordsPerBatch" conf to control the batch sizes. However, the "right" batch size usually depends on the task itself. It would be nice if user can configure the batch size when they declare the Pandas UDF. This is orthogonal to SPARK-23258 (using max buffer size instead of row count). Besides API, we should also discuss how to merge Pandas UDFs of different configurations. For example, {code} df.select(predict1(col("features"), predict2(col("features"))) {code} when predict1 requests 100 rows per batch, while predict2 requests 120 rows per batch. cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] was: We use a "maxRecordsPerBatch" conf to control the batch sizes. However, the "right" batch size usually depends on the task itself. It would be nice if user can configure the batch size when they declare the Pandas UDF. This is orthogonal to SPARK-23258 (using max buffer size instead of row count). Besides API, we should also discuss how to merge Pandas UDFs of different configurations. For example, {code} df.select(predict1(col("features"), predict2(col("features"))) {code} when predict1 requests 100 rows per batch, while predict2 requests 120 rows per batch. cc: [~icexelloss] [~bryanc] [~holdenk] [~ueshin] [~smilegator] > Support per Pandas UDF configuration > > > Key: SPARK-26410 > URL: https://issues.apache.org/jira/browse/SPARK-26410 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Priority: Major > > We use a "maxRecordsPerBatch" conf to control the batch sizes. However, the > "right" batch size usually depends on the task itself. It would be nice if > user can configure the batch size when they declare the Pandas UDF. > This is orthogonal to SPARK-23258 (using max buffer size instead of row > count). > Besides API, we should also discuss how to merge Pandas UDFs of different > configurations. For example, > {code} > df.select(predict1(col("features"), predict2(col("features"))) > {code} > when predict1 requests 100 rows per batch, while predict2 requests 120 rows > per batch. > cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26409) SQLConf should be serializable in test sessions
[ https://issues.apache.org/jira/browse/SPARK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gengliang Wang updated SPARK-26409: --- Description: `SQLConf` is supposed to be serializable. However, currently it is not serializable in `WithTestConf`. `WithTestConf` uses the method `overrideConfs` in closure, while the classes which implements it (`TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder`) are not serializable. Use a local variable to fix it. was: Currently the `SQLConf` in `WithTestConf` is not serializable. It uses the method `overrideConfs` in closure, while the class `TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder` are not serializable. This PR is to use a local variable to fix it. > SQLConf should be serializable in test sessions > --- > > Key: SPARK-26409 > URL: https://issues.apache.org/jira/browse/SPARK-26409 > Project: Spark > Issue Type: Improvement > Components: SQL, Tests >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Priority: Major > > `SQLConf` is supposed to be serializable. However, currently it is not > serializable in `WithTestConf`. `WithTestConf` uses the method > `overrideConfs` in closure, while the classes which implements it > (`TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder`) are not > serializable. > Use a local variable to fix it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26411) Streaming: _spark_metadata and checkpoints out of sync cause checkpoint packing failure
Alexander Panzhin created SPARK-26411: - Summary: Streaming: _spark_metadata and checkpoints out of sync cause checkpoint packing failure Key: SPARK-26411 URL: https://issues.apache.org/jira/browse/SPARK-26411 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.3.0 Reporter: Alexander Panzhin Spark Structured Streaming File source to File sink seems to be picking up information from `_spark_metadata` directory for checkpoint data compaction Worst part is that output and checkpoint being out of sync, data is not being written. *This is not documented anywhere. Removing checkpoint data and leaving _spark_metadata in the output directory WILL CAUSE data loss.* FileSourceScanExec.createNonBucketedReadRDD kicks off compaction and fails the whole job, because it expects deltas to be present. But the delta files are never written because FileStreamSink.addBatch doesn't execute the Dataframe that it receives. {code:java} ... INFO [2018-12-17 03:20:02,784] org.apache.spark.sql.execution.streaming.FileStreamSink: Skipping already committed batch 75 ... INFO [2018-12-17 03:30:01,691] org.apache.spark.sql.execution.streaming.FileStreamSource: Log offset set to 76 with 29 new files INFO [2018-12-17 03:30:01,700] org.apache.spark.sql.execution.streaming.MicroBatchExecution: Committed offsets for batch 76. Metadata OffsetSeqMetadata(0,1545017401691,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider)) INFO [2018-12-17 03:30:01,704] org.apache.spark.sql.execution.streaming.FileStreamSource: Processing 29 files from 76:76 INFO [2018-12-17 03:30:01,983] org.apache.spark.sql.execution.datasources.FileSourceStrategy: Pruning directories with: INFO [2018-12-17 03:30:01,983] org.apache.spark.sql.execution.datasources.FileSourceStrategy: Post-Scan Filters: INFO [2018-12-17 03:30:01,984] org.apache.spark.sql.execution.datasources.FileSourceStrategy: Output Data Schema: struct INFO [2018-12-17 03:30:01,984] org.apache.spark.sql.execution.FileSourceScanExec: Pushed Filters: INFO [2018-12-17 03:30:02,581] org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Code generated in 16.205011 ms INFO [2018-12-17 03:30:02,593] org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Code generated in 9.368244 ms INFO [2018-12-17 03:30:02,629] org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Code generated in 31.126375 ms INFO [2018-12-17 03:30:02,640] org.apache.spark.SparkContext: Created broadcast 86 from start at SourceStream.scala:55 INFO [2018-12-17 03:30:02,643] org.apache.spark.sql.execution.FileSourceScanExec: Planning scan with bin packing, max size: 14172786 bytes, open cost is considered as scanning 4194304 bytes. INFO [2018-12-17 03:30:02,700] org.apache.spark.ContextCleaner: Cleaned accumulator 4321 INFO [2018-12-17 03:30:02,700] org.apache.spark.ContextCleaner: Cleaned accumulator 4326 INFO [2018-12-17 03:30:02,700] org.apache.spark.ContextCleaner: Cleaned accumulator 4324 INFO [2018-12-17 03:30:02,700] org.apache.spark.ContextCleaner: Cleaned accumulator 4320 INFO [2018-12-17 03:30:02,700] org.apache.spark.ContextCleaner: Cleaned accumulator 4325 INFO [2018-12-17 03:30:02,737] org.apache.spark.SparkContext: Created broadcast 87 from start at SourceStream.scala:55 INFO [2018-12-17 03:30:02,756] org.apache.spark.SparkContext: Starting job: start at SourceStream.scala:55 INFO [2018-12-17 03:30:02,761] org.apache.spark.SparkContext: Created broadcast 88 from broadcast at DAGScheduler.scala:1079 INFO [2018-12-17 03:30:03,860] org.apache.spark.ExecutorAllocationManager: Requesting 3 new executors because tasks are backlogged (new desired total will be 3) INFO [2018-12-17 03:30:04,863] org.apache.spark.ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 4) INFO [2018-12-17 03:30:06,545] org.apache.spark.SparkContext: Created broadcast 89 from broadcast at DAGScheduler.scala:1079 WARN [2018-12-17 03:30:07,214] org.apache.spark.scheduler.TaskSetManager: Lost task 19.0 in stage 87.0 (TID 6145, ip-10-172-18-94.ec2.internal, executor 1): java.lang.IllegalStateException: Error reading delta file hdfs://ip-10-172-19-174.ec2.internal:8020/user/hadoop/clickstream/checkpoint/state/0/19/1.delta of HDFSStateStoreProvider[id = (op=0,part=19),dir = hdfs://ip-10-172-19-174.ec2.internal:8020/user/hadoop/clickstream/checkpoint/state/0/19]: hdfs://ip-10-172-19-174.ec2.internal:8020/user/hadoop/clickstream/checkpoint/state/0/19/1.delta does not exist at
[jira] [Commented] (SPARK-26390) ColumnPruning rule should only do column pruning
[ https://issues.apache.org/jira/browse/SPARK-26390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725216#comment-16725216 ] ASF GitHub Bot commented on SPARK-26390: asfgit closed pull request #23343: [SPARK-26390][SQL] ColumnPruning rule should only do column pruning URL: https://github.com/apache/spark/pull/23343 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3eb6bca6ec976..44d5543114902 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -93,7 +93,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) RewriteCorrelatedScalarSubquery, EliminateSerialization, RemoveRedundantAliases, -RemoveRedundantProject, +RemoveNoopOperators, SimplifyExtractValueOps, CombineConcats) ++ extendedOperatorOptimizationRules @@ -177,7 +177,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) RewritePredicateSubquery, ColumnPruning, CollapseProject, - RemoveRedundantProject) :+ + RemoveNoopOperators) :+ Batch("UpdateAttributeReferences", Once, UpdateNullabilityInAttributeReferences) } @@ -403,11 +403,15 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { } /** - * Remove projections from the query plan that do not make any modifications. + * Remove no-op operators from the query plan that do not make any modifications. */ -object RemoveRedundantProject extends Rule[LogicalPlan] { +object RemoveNoopOperators extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case p @ Project(_, child) if p.output == child.output => child +// Eliminate no-op Projects +case p @ Project(_, child) if child.sameOutput(p) => child + +// Eliminate no-op Window +case w: Window if w.windowExpressions.isEmpty => w.child } } @@ -602,17 +606,12 @@ object ColumnPruning extends Rule[LogicalPlan] { p.copy(child = w.copy( windowExpressions = w.windowExpressions.filter(p.references.contains))) -// Eliminate no-op Window -case w: Window if w.windowExpressions.isEmpty => w.child - -// Eliminate no-op Projects -case p @ Project(_, child) if child.sameOutput(p) => child - // Can't prune the columns on LeafNode case p @ Project(_, _: LeafNode) => p // for all other logical plans that inherits the output from it's children -case p @ Project(_, child) => +// Project over project is handled by the first case, skip it here. +case p @ Project(_, child) if !child.isInstanceOf[Project] => val required = child.references ++ p.references if (!child.inputSet.subsetOf(required)) { val newChildren = child.children.map(c => prunedChild(c, required)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 8d7c9bf220bc2..57195d5fda7c5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -34,6 +34,7 @@ class ColumnPruningSuite extends PlanTest { val batches = Batch("Column pruning", FixedPoint(100), PushDownPredicate, ColumnPruning, + RemoveNoopOperators, CollapseProject) :: Nil } @@ -340,10 +341,8 @@ class ColumnPruningSuite extends PlanTest { test("Column pruning on Union") { val input1 = LocalRelation('a.int, 'b.string, 'c.double) val input2 = LocalRelation('c.int, 'd.string, 'e.double) -val query = Project('b :: Nil, - Union(input1 :: input2 :: Nil)).analyze -val expected = Project('b :: Nil, - Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil)).analyze +val query = Project('b :: Nil, Union(input1 :: input2 :: Nil)).analyze +val expected = Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil).analyze comparePlans(Optimize.execute(query), expected) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index ef4b848924f06..b190dd5a7c220 100644 ---
[jira] [Resolved] (SPARK-26390) ColumnPruning rule should only do column pruning
[ https://issues.apache.org/jira/browse/SPARK-26390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-26390. --- Resolution: Fixed Fix Version/s: 3.0.0 This is resolved via https://github.com/apache/spark/pull/23343 > ColumnPruning rule should only do column pruning > > > Key: SPARK-26390 > URL: https://issues.apache.org/jira/browse/SPARK-26390 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26410) Support per Pandas UDF configuration
Xiangrui Meng created SPARK-26410: - Summary: Support per Pandas UDF configuration Key: SPARK-26410 URL: https://issues.apache.org/jira/browse/SPARK-26410 Project: Spark Issue Type: New Feature Components: PySpark Affects Versions: 3.0.0 Reporter: Xiangrui Meng We use a "maxRecordsPerBatch" conf to control the batch sizes. However, the "right" batch size usually depends on the task itself. It would be nice if user can configure the batch size when they declare the Pandas UDF. This is orthogonal to SPARK-23258 (using max buffer size instead of row count). Besides API, we should also discuss how to merge Pandas UDFs of different configurations. For example, {code} df.select(predict1(col("features"), predict2(col("features"))) {code} when predict1 requests 100 rows per batch, while predict2 requests 120 rows per batch. cc: [~icexelloss] [~bryanc] [~holdenk] [~ueshin] [~smilegator] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26409) SQLConf should be serializable in test sessions
[ https://issues.apache.org/jira/browse/SPARK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26409: Assignee: Apache Spark > SQLConf should be serializable in test sessions > --- > > Key: SPARK-26409 > URL: https://issues.apache.org/jira/browse/SPARK-26409 > Project: Spark > Issue Type: Improvement > Components: SQL, Tests >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Assignee: Apache Spark >Priority: Major > > Currently the `SQLConf` in `WithTestConf` is not serializable. It uses the > method `overrideConfs` in closure, while the class > `TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder` are not > serializable. > This PR is to use a local variable to fix it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26409) SQLConf should be serializable in test sessions
[ https://issues.apache.org/jira/browse/SPARK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26409: Assignee: (was: Apache Spark) > SQLConf should be serializable in test sessions > --- > > Key: SPARK-26409 > URL: https://issues.apache.org/jira/browse/SPARK-26409 > Project: Spark > Issue Type: Improvement > Components: SQL, Tests >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Priority: Major > > Currently the `SQLConf` in `WithTestConf` is not serializable. It uses the > method `overrideConfs` in closure, while the class > `TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder` are not > serializable. > This PR is to use a local variable to fix it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26409) SQLConf should be serializable in test sessions
[ https://issues.apache.org/jira/browse/SPARK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725190#comment-16725190 ] ASF GitHub Bot commented on SPARK-26409: gengliangwang opened a new pull request #23352: [SPARK-26409][SQL][TESTS] SQLConf should be serializable in test sessions URL: https://github.com/apache/spark/pull/23352 ## What changes were proposed in this pull request? Currently the `SQLConf` in `WithTestConf` is not serializable. It uses the method `overrideConfs` in closure, while the class `TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder` are not serializable. This PR is to use a local variable to fix it. ## How was this patch tested? Add unit test. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > SQLConf should be serializable in test sessions > --- > > Key: SPARK-26409 > URL: https://issues.apache.org/jira/browse/SPARK-26409 > Project: Spark > Issue Type: Improvement > Components: SQL, Tests >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Priority: Major > > Currently the `SQLConf` in `WithTestConf` is not serializable. It uses the > method `overrideConfs` in closure, while the class > `TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder` are not > serializable. > This PR is to use a local variable to fix it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26409) SQLConf should be serializable in test sessions
Gengliang Wang created SPARK-26409: -- Summary: SQLConf should be serializable in test sessions Key: SPARK-26409 URL: https://issues.apache.org/jira/browse/SPARK-26409 Project: Spark Issue Type: Improvement Components: SQL, Tests Affects Versions: 3.0.0 Reporter: Gengliang Wang Currently the `SQLConf` in `WithTestConf` is not serializable. It uses the method `overrideConfs` in closure, while the class `TestHiveSessionStateBuilder` and `TestSQLSessionStateBuilder` are not serializable. This PR is to use a local variable to fix it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26408) java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347)
Amit Siddhu created SPARK-26408: --- Summary: java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) Key: SPARK-26408 URL: https://issues.apache.org/jira/browse/SPARK-26408 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.2 Environment: Spark version 2.3.2 Scala version 2.11.8 Hbase version 1.4.7 Reporter: Amit Siddhu {code:java} sudo spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://repo.hortonworks.com/content/groups/public/ {code} {code:java} import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._ {code} {code:java} def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() } {code} {code:java} def motorQuoteCatatog = s"""{ |"table":{"namespace":"default", "name":"public.motor_product_quote", "tableCoder":"PrimitiveType"}, |"rowkey":"id", |"columns":{ |"id":{"cf":"rowkey", "col":"id", "type":"string"}, |"quote_id":{"cf":"motor_product_quote", "col":"quote_id", "type":"string"}, |"vehicle_id":{"cf":"motor_product_quote", "col":"vehicle_id", "type":"bigint"}, |"is_new":{"cf":"motor_product_quote", "col":"is_new", "type":"boolean"}, |"date_of_manufacture":{"cf":"motor_product_quote", "col":"date_of_manufacture", "type":"string"}, |"raw_data":{"cf":"motor_product_quote", "col":"raw_data", "type":"string"}, |"is_processed":{"cf":"motor_product_quote", "col":"is_processed", "type":"boolean"}, |"created_on":{"cf":"motor_product_quote", "col":"created_on", "type":"string"}, |"type":{"cf":"motor_product_quote", "col":"type", "type":"string"}, |"requirement_id":{"cf":"motor_product_quote", "col":"requirement_id", "type":"int"}, |"previous_policy_id":{"cf":"motor_product_quote", "col":"type", "previous_policy_id":"int"}, |"parent_quote_id":{"cf":"motor_product_quote", "col":"type", "parent_quote_id":"int"}, |"ticket_id":{"cf":"motor_product_quote", "col":"type", "ticket_id":"int"}, |"tracker_id":{"cf":"motor_product_quote", "col":"tracker_id", "type":"int"}, |"category":{"cf":"motor_product_quote", "col":"category", "type":"string"}, |"sales_channel_id":{"cf":"motor_product_quote", "col":"sales_channel_id", "type":"int"}, |"policy_type":{"cf":"motor_product_quote", "col":"policy_type", "type":"string"}, |"original_quote_created_by_id":{"cf":"motor_product_quote", "col":"type", "original_quote_created_by_id":"int"}, |"created_by_id":{"cf":"motor_product_quote", "col":"created_by_id", "type":"int"}, |"mobile":{"cf":"motor_product_quote", "col":"mobile", "type":"string"}, |"registration_number":{"cf":"motor_product_quote", "col":"registration_number", "type":"string"} |} |}""".stripMargin {code} {code:java} val df = withCatalog(motorQuoteCatatog){code} {code:java} java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at org.apache.spark.sql.execution.datasources.hbase.Field. (HBaseTableCatalog.scala:102) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$$anonfun$ap ply$3.apply(HBaseTableCatalog.scala:286) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$$anonfun$apply$3.apply(HBaseTableCatalog.scala:281) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:281) at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:80) at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at withCatalog(:38) ... 55 elided {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26396) Kafka consumer cache overflow since 2.4.x
[ https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725104#comment-16725104 ] Kaspar Tint commented on SPARK-26396: - So in case there are four groupid's all consuming same topic with 90 partitions 360 for one JVM should be correct, right? > Kafka consumer cache overflow since 2.4.x > - > > Key: SPARK-26396 > URL: https://issues.apache.org/jira/browse/SPARK-26396 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark 2.4 standalone client mode >Reporter: Kaspar Tint >Priority: Major > > We are experiencing an issue where the Kafka consumer cache seems to overflow > constantly upon starting the application. This issue appeared after upgrading > to Spark 2.4. > We would get constant warnings like this: > {code:java} > 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43) > {code} > This application is running 4 different Spark Structured Streaming queries > against the same Kafka topic that has 90 partitions. We used to run it with > just the default settings so it defaulted to cache size 64 on Spark 2.3 but > now we tried to put it to 180 or 360. With 360 we will have a lot less noise > about the overflow but resource need will increase substantially. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26216) Do not use case class as public API (ScalaUDF)
[ https://issues.apache.org/jira/browse/SPARK-26216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725099#comment-16725099 ] ASF GitHub Bot commented on SPARK-26216: cloud-fan opened a new pull request #23351: [SPARK-26216][SQL][followup] use abstract class instead of trait for UserDefinedFunction URL: https://github.com/apache/spark/pull/23351 ## What changes were proposed in this pull request? A followup of https://github.com/apache/spark/pull/23178 , to keep binary compability by using abstract class. ## How was this patch tested? Manual test. I created a simple app with Spark 2.4 ``` object TryUDF { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate() import spark.implicits._ val f1 = udf((i: Int) => i + 1) spark.range(10).select(f1($"id")).show() spark.stop() } } ``` When I run it with current master, it fails with ``` java.lang.IncompatibleClassChangeError: Found interface org.apache.spark.sql.expressions.UserDefinedFunction, but class was expected ``` When I run it with this PR, it works This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Do not use case class as public API (ScalaUDF) > -- > > Key: SPARK-26216 > URL: https://issues.apache.org/jira/browse/SPARK-26216 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Labels: release-notes > Fix For: 3.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26396) Kafka consumer cache overflow since 2.4.x
[ https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725063#comment-16725063 ] Gabor Somogyi edited comment on SPARK-26396 at 12/19/18 2:48 PM: - Number of executors can be set with {{--num-executors parameter, so it can be set differently for PROD and DEV.}} {{There is no specific formula. One cached consumer will be created and stored inside a JVM in cache per (topic + partition + group.id). was (Author: gsomogyi): Number of executors can be set with {{--num-executors parameter, so it can be set differently for PROD and DEV.}} {{There is no specific formula. One cached consumer will be created and stored inside a JVM in cache per (topic + partition + }}group.id). > Kafka consumer cache overflow since 2.4.x > - > > Key: SPARK-26396 > URL: https://issues.apache.org/jira/browse/SPARK-26396 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark 2.4 standalone client mode >Reporter: Kaspar Tint >Priority: Major > > We are experiencing an issue where the Kafka consumer cache seems to overflow > constantly upon starting the application. This issue appeared after upgrading > to Spark 2.4. > We would get constant warnings like this: > {code:java} > 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43) > {code} > This application is running 4 different Spark Structured Streaming queries > against the same Kafka topic that has 90 partitions. We used to run it with > just the default settings so it defaulted to cache size 64 on Spark 2.3 but > now we tried to put it to 180 or 360. With 360 we will have a lot less noise > about the overflow but resource need will increase substantially. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26396) Kafka consumer cache overflow since 2.4.x
[ https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725063#comment-16725063 ] Gabor Somogyi edited comment on SPARK-26396 at 12/19/18 2:50 PM: - Number of executors can be set with --num-executors parameter, so it can be set differently for PROD and DEV. There is no specific formula. One cached consumer will be created and stored inside a JVM cache per (topic + partition + group.id). was (Author: gsomogyi): Number of executors can be set with --num-executors parameter, so it can be set differently for PROD and DEV. There is no specific formula. One cached consumer will be created and stored inside a JVM in cache per (topic + partition + group.id). > Kafka consumer cache overflow since 2.4.x > - > > Key: SPARK-26396 > URL: https://issues.apache.org/jira/browse/SPARK-26396 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark 2.4 standalone client mode >Reporter: Kaspar Tint >Priority: Major > > We are experiencing an issue where the Kafka consumer cache seems to overflow > constantly upon starting the application. This issue appeared after upgrading > to Spark 2.4. > We would get constant warnings like this: > {code:java} > 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43) > {code} > This application is running 4 different Spark Structured Streaming queries > against the same Kafka topic that has 90 partitions. We used to run it with > just the default settings so it defaulted to cache size 64 on Spark 2.3 but > now we tried to put it to 180 or 360. With 360 we will have a lot less noise > about the overflow but resource need will increase substantially. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26396) Kafka consumer cache overflow since 2.4.x
[ https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725063#comment-16725063 ] Gabor Somogyi edited comment on SPARK-26396 at 12/19/18 2:49 PM: - Number of executors can be set with --num-executors parameter, so it can be set differently for PROD and DEV. There is no specific formula. One cached consumer will be created and stored inside a JVM in cache per (topic + partition + group.id). was (Author: gsomogyi): Number of executors can be set with {{--num-executors parameter, so it can be set differently for PROD and DEV.}} {{There is no specific formula. One cached consumer will be created and stored inside a JVM in cache per (topic + partition + group.id). > Kafka consumer cache overflow since 2.4.x > - > > Key: SPARK-26396 > URL: https://issues.apache.org/jira/browse/SPARK-26396 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark 2.4 standalone client mode >Reporter: Kaspar Tint >Priority: Major > > We are experiencing an issue where the Kafka consumer cache seems to overflow > constantly upon starting the application. This issue appeared after upgrading > to Spark 2.4. > We would get constant warnings like this: > {code:java} > 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43) > {code} > This application is running 4 different Spark Structured Streaming queries > against the same Kafka topic that has 90 partitions. We used to run it with > just the default settings so it defaulted to cache size 64 on Spark 2.3 but > now we tried to put it to 180 or 360. With 360 we will have a lot less noise > about the overflow but resource need will increase substantially. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26396) Kafka consumer cache overflow since 2.4.x
[ https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725063#comment-16725063 ] Gabor Somogyi commented on SPARK-26396: --- Number of executors can be set with {{--num-executors parameter, so it can be set differently for PROD and DEV.}} {{There is no specific formula. One cached consumer will be created and stored inside a JVM in cache per (topic + partition + }}group.id). > Kafka consumer cache overflow since 2.4.x > - > > Key: SPARK-26396 > URL: https://issues.apache.org/jira/browse/SPARK-26396 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark 2.4 standalone client mode >Reporter: Kaspar Tint >Priority: Major > > We are experiencing an issue where the Kafka consumer cache seems to overflow > constantly upon starting the application. This issue appeared after upgrading > to Spark 2.4. > We would get constant warnings like this: > {code:java} > 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43) > {code} > This application is running 4 different Spark Structured Streaming queries > against the same Kafka topic that has 90 partitions. We used to run it with > just the default settings so it defaulted to cache size 64 on Spark 2.3 but > now we tried to put it to 180 or 360. With 360 we will have a lot less noise > about the overflow but resource need will increase substantially. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26396) Kafka consumer cache overflow since 2.4.x
[ https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725050#comment-16725050 ] Kaspar Tint edited comment on SPARK-26396 at 12/19/18 2:39 PM: --- Any exact formula to use for this when considering that the application can have many different queries? We don't need that many executors in dev for instance but in production we indeed have plenty of them. was (Author: tint): Any exact formula to use for this when considering that the application can have many different queries? > Kafka consumer cache overflow since 2.4.x > - > > Key: SPARK-26396 > URL: https://issues.apache.org/jira/browse/SPARK-26396 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark 2.4 standalone client mode >Reporter: Kaspar Tint >Priority: Major > > We are experiencing an issue where the Kafka consumer cache seems to overflow > constantly upon starting the application. This issue appeared after upgrading > to Spark 2.4. > We would get constant warnings like this: > {code:java} > 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43) > {code} > This application is running 4 different Spark Structured Streaming queries > against the same Kafka topic that has 90 partitions. We used to run it with > just the default settings so it defaulted to cache size 64 on Spark 2.3 but > now we tried to put it to 180 or 360. With 360 we will have a lot less noise > about the overflow but resource need will increase substantially. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26396) Kafka consumer cache overflow since 2.4.x
[ https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725050#comment-16725050 ] Kaspar Tint commented on SPARK-26396: - Any exact formula to use for this when considering that the application can have many different queries? > Kafka consumer cache overflow since 2.4.x > - > > Key: SPARK-26396 > URL: https://issues.apache.org/jira/browse/SPARK-26396 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark 2.4 standalone client mode >Reporter: Kaspar Tint >Priority: Major > > We are experiencing an issue where the Kafka consumer cache seems to overflow > constantly upon starting the application. This issue appeared after upgrading > to Spark 2.4. > We would get constant warnings like this: > {code:java} > 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43) > {code} > This application is running 4 different Spark Structured Streaming queries > against the same Kafka topic that has 90 partitions. We used to run it with > just the default settings so it defaulted to cache size 64 on Spark 2.3 but > now we tried to put it to 180 or 360. With 360 we will have a lot less noise > about the overflow but resource need will increase substantially. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26396) Kafka consumer cache overflow since 2.4.x
[ https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725021#comment-16725021 ] Gabor Somogyi edited comment on SPARK-26396 at 12/19/18 2:07 PM: - [~Tint] seems like you're trying to scale your application vertically which requires really strong machine. Try to scale horizontally and add further executors. This way the load will be split up and reduce the amount of cached consumers per JVM. was (Author: gsomogyi): [~Tint] seems like you're trying to scale your application vertically which requires really strong machine. Try to scale horizontally and add further executors. This way the load will be split up. This will reduce the amount of cached consumers per JVM. > Kafka consumer cache overflow since 2.4.x > - > > Key: SPARK-26396 > URL: https://issues.apache.org/jira/browse/SPARK-26396 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark 2.4 standalone client mode >Reporter: Kaspar Tint >Priority: Major > > We are experiencing an issue where the Kafka consumer cache seems to overflow > constantly upon starting the application. This issue appeared after upgrading > to Spark 2.4. > We would get constant warnings like this: > {code:java} > 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43) > {code} > This application is running 4 different Spark Structured Streaming queries > against the same Kafka topic that has 90 partitions. We used to run it with > just the default settings so it defaulted to cache size 64 on Spark 2.3 but > now we tried to put it to 180 or 360. With 360 we will have a lot less noise > about the overflow but resource need will increase substantially. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26396) Kafka consumer cache overflow since 2.4.x
[ https://issues.apache.org/jira/browse/SPARK-26396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725021#comment-16725021 ] Gabor Somogyi commented on SPARK-26396: --- [~Tint] seems like you're trying to scale your application vertically which requires really strong machine. Try to scale horizontally and add further executors. This way the load will be split up. This will reduce the amount of cached consumers per JVM. > Kafka consumer cache overflow since 2.4.x > - > > Key: SPARK-26396 > URL: https://issues.apache.org/jira/browse/SPARK-26396 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark 2.4 standalone client mode >Reporter: Kaspar Tint >Priority: Major > > We are experiencing an issue where the Kafka consumer cache seems to overflow > constantly upon starting the application. This issue appeared after upgrading > to Spark 2.4. > We would get constant warnings like this: > {code:java} > 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57) > 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max > capacity of 180, removing consumer for > CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43) > {code} > This application is running 4 different Spark Structured Streaming queries > against the same Kafka topic that has 90 partitions. We used to run it with > just the default settings so it defaulted to cache size 64 on Spark 2.3 but > now we tried to put it to 180 or 360. With 360 we will have a lot less noise > about the overflow but resource need will increase substantially. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26407) For an external non-partitioned table, if add a directory named with k=v to the table path, select result will be wrong
[ https://issues.apache.org/jira/browse/SPARK-26407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bao Yunz updated SPARK-26407: - Description: Scenario 1 Create an external non-partitioned table, in which location directory has a directory named with "part=1" and its schema is (id, name), for example. And there is some data in the "part=1" directory. Then desc the table, we will find the "part" is added in table scehma as table column. when insert into the table with two columns data, will throw a exception that target table has 3 columns but the inserted data has 2 columns. Scenario 2 Create an external non-partitioned table, which location path is empty and its scema is (id, name), for example. After several times insert operation, we add a directory named with "part=1" in the table location directory. And there is some data in the "part=1" directory. Then do insert and select operation, we will find the scan path is changed to "tablePath/part=1",so that we will get a wrong result. The right logic should be that if a table is a non-partitioned table, adding a partition-like folder under tablePath should not change its schema and select result. was: Scene 1 Create an external non-partition table, in which location directory has a directory named with "part=1", for example. Then desc the table, we will find the string "part" is showed in table column. when insert the table with data which has same column with target table , will throw a exception that target table has different column number with the inserted data. Scene 2 Create a external non-partition table, which location path is empty. After several times insert operation, we add a directory named with "part=1" in the table location directory. Then do insert and select operation, we will find the scan path is changed to "tablePath/part=1",so that we will get a wrong result. It seems that the existing logic of spark will process this kind of table like a partition table. But when we do show partitions operation, it will throw the exception that the table is not partitioned, which is confusing。We believe that the normal logic should be that if a table is a non-partitioned table, the folder under tablePath should not change its basic properties. > For an external non-partitioned table, if add a directory named with k=v to > the table path, select result will be wrong > --- > > Key: SPARK-26407 > URL: https://issues.apache.org/jira/browse/SPARK-26407 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Bao Yunz >Priority: Major > Labels: usability > > Scenario 1 > Create an external non-partitioned table, in which location directory has a > directory named with "part=1" and its schema is (id, name), for example. And > there is some data in the "part=1" directory. Then desc the table, we will > find the "part" is added in table scehma as table column. when insert into > the table with two columns data, will throw a exception that target table > has 3 columns but the inserted data has 2 columns. > Scenario 2 > Create an external non-partitioned table, which location path is empty and > its scema is (id, name), for example. After several times insert operation, > we add a directory named with "part=1" in the table location directory. And > there is some data in the "part=1" directory. Then do insert and select > operation, we will find the scan path is changed to "tablePath/part=1",so > that we will get a wrong result. > The right logic should be that if a table is a non-partitioned table, adding > a partition-like folder under tablePath should not change its schema and > select result. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26407) For an external non-partitioned table, if add a directory named with k=v to the table path, select result will be wrong
[ https://issues.apache.org/jira/browse/SPARK-26407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bao Yunz updated SPARK-26407: - Affects Version/s: (was: 2.3.2) > For an external non-partitioned table, if add a directory named with k=v to > the table path, select result will be wrong > --- > > Key: SPARK-26407 > URL: https://issues.apache.org/jira/browse/SPARK-26407 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Bao Yunz >Priority: Major > Labels: usability > > Scene 1 > Create a external non-partition table, in which location directory has a > directory named with "part=1", for example. Then desc the table, we will find > the string "part" is showed in table column. when insert the table with data > which has same column with target table , will throw a exception that target > table has different column number with the inserted data. > Scene 2 > Create a external non-partition table, which location path is empty. After > several times insert operation, we add a directory named with "part=1" in the > table location directory. Then do insert and select operation, we will find > the scan path is changed to "tablePath/part=1",so that we will get a wrong > result. > > It seems that the existing logic of spark will process this kind of table > like a partition table. But when we do show partitions operation, it will > throw the exception that the table is not partitioned, which is confusing。We > believe that the normal logic should be that if a table is a non-partitioned > table, the folder under tablePath should not change its basic properties. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26407) For an external non-partitioned table, if add a directory named with k=v to the table path, select result will be wrong
[ https://issues.apache.org/jira/browse/SPARK-26407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bao Yunz updated SPARK-26407: - Description: Scene 1 Create an external non-partition table, in which location directory has a directory named with "part=1", for example. Then desc the table, we will find the string "part" is showed in table column. when insert the table with data which has same column with target table , will throw a exception that target table has different column number with the inserted data. Scene 2 Create a external non-partition table, which location path is empty. After several times insert operation, we add a directory named with "part=1" in the table location directory. Then do insert and select operation, we will find the scan path is changed to "tablePath/part=1",so that we will get a wrong result. It seems that the existing logic of spark will process this kind of table like a partition table. But when we do show partitions operation, it will throw the exception that the table is not partitioned, which is confusing。We believe that the normal logic should be that if a table is a non-partitioned table, the folder under tablePath should not change its basic properties. was: Scene 1 Create a external non-partition table, in which location directory has a directory named with "part=1", for example. Then desc the table, we will find the string "part" is showed in table column. when insert the table with data which has same column with target table , will throw a exception that target table has different column number with the inserted data. Scene 2 Create a external non-partition table, which location path is empty. After several times insert operation, we add a directory named with "part=1" in the table location directory. Then do insert and select operation, we will find the scan path is changed to "tablePath/part=1",so that we will get a wrong result. It seems that the existing logic of spark will process this kind of table like a partition table. But when we do show partitions operation, it will throw the exception that the table is not partitioned, which is confusing。We believe that the normal logic should be that if a table is a non-partitioned table, the folder under tablePath should not change its basic properties. > For an external non-partitioned table, if add a directory named with k=v to > the table path, select result will be wrong > --- > > Key: SPARK-26407 > URL: https://issues.apache.org/jira/browse/SPARK-26407 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Bao Yunz >Priority: Major > Labels: usability > > Scene 1 > Create an external non-partition table, in which location directory has a > directory named with "part=1", for example. Then desc the table, we will find > the string "part" is showed in table column. when insert the table with data > which has same column with target table , will throw a exception that target > table has different column number with the inserted data. > Scene 2 > Create a external non-partition table, which location path is empty. After > several times insert operation, we add a directory named with "part=1" in the > table location directory. Then do insert and select operation, we will find > the scan path is changed to "tablePath/part=1",so that we will get a wrong > result. > > It seems that the existing logic of spark will process this kind of table > like a partition table. But when we do show partitions operation, it will > throw the exception that the table is not partitioned, which is confusing。We > believe that the normal logic should be that if a table is a non-partitioned > table, the folder under tablePath should not change its basic properties. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26407) For an external non-partitioned table, if add a directory named with k=v to the table path, select result will be wrong
[ https://issues.apache.org/jira/browse/SPARK-26407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bao Yunz updated SPARK-26407: - Summary: For an external non-partitioned table, if add a directory named with k=v to the table path, select result will be wrong (was: For an external non-partition table, if add a directory named with k=v to the table path, select result will be wrong) > For an external non-partitioned table, if add a directory named with k=v to > the table path, select result will be wrong > --- > > Key: SPARK-26407 > URL: https://issues.apache.org/jira/browse/SPARK-26407 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.2, 2.4.0 >Reporter: Bao Yunz >Priority: Major > Labels: usability > > Scene 1 > Create a external non-partition table, in which location directory has a > directory named with "part=1", for example. Then desc the table, we will find > the string "part" is showed in table column. when insert the table with data > which has same column with target table , will throw a exception that target > table has different column number with the inserted data. > Scene 2 > Create a external non-partition table, which location path is empty. After > several times insert operation, we add a directory named with "part=1" in the > table location directory. Then do insert and select operation, we will find > the scan path is changed to "tablePath/part=1",so that we will get a wrong > result. > > It seems that the existing logic of spark will process this kind of table > like a partition table. But when we do show partitions operation, it will > throw the exception that the table is not partitioned, which is confusing。We > believe that the normal logic should be that if a table is a non-partitioned > table, the folder under tablePath should not change its basic properties. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26407) For an external non-partition table, if add a directory named with k=v to the table path, select result will be wrong
[ https://issues.apache.org/jira/browse/SPARK-26407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bao Yunz updated SPARK-26407: - Summary: For an external non-partition table, if add a directory named with k=v to the table path, select result will be wrong (was: Select result is incorrect when add a directory named with k=v to the table path of external non-partition table) > For an external non-partition table, if add a directory named with k=v to the > table path, select result will be wrong > - > > Key: SPARK-26407 > URL: https://issues.apache.org/jira/browse/SPARK-26407 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.2, 2.4.0 >Reporter: Bao Yunz >Priority: Major > Labels: usability > > Scene 1 > Create a external non-partition table, in which location directory has a > directory named with "part=1", for example. Then desc the table, we will find > the string "part" is showed in table column. when insert the table with data > which has same column with target table , will throw a exception that target > table has different column number with the inserted data. > Scene 2 > Create a external non-partition table, which location path is empty. After > several times insert operation, we add a directory named with "part=1" in the > table location directory. Then do insert and select operation, we will find > the scan path is changed to "tablePath/part=1",so that we will get a wrong > result. > > It seems that the existing logic of spark will process this kind of table > like a partition table. But when we do show partitions operation, it will > throw the exception that the table is not partitioned, which is confusing。We > believe that the normal logic should be that if a table is a non-partitioned > table, the folder under tablePath should not change its basic properties. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26407) Select result is incorrect when add a directory named with k=v to the table path of external non-partition table
[ https://issues.apache.org/jira/browse/SPARK-26407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bao Yunz updated SPARK-26407: - Description: Scene 1 Create a external non-partition table, in which location directory has a directory named with "part=1", for example. Then desc the table, we will find the string "part" is showed in table column. when insert the table with data which has same column with target table , will throw a exception that target table has different column number with the inserted data. Scene 2 Create a external non-partition table, which location path is empty. After several times insert operation, we add a directory named with "part=1" in the table location directory. Then do insert and select operation, we will find the scan path is changed to "tablePath/part=1",so that we will get a wrong result. It seems that the existing logic of spark will process this kind of table like a partition table. But when we do show partitions operation, it will throw the exception that the table is not partitioned, which is confusing。We believe that the normal logic should be that if a table is a non-partitioned table, the folder under tablePath should not change its basic properties. was: Scene 1 Create a external non-partition table, in which location path has a directory named with "part=1", for example. Then desc the table, we will find the string "part" is showed in table column. when insert the table with data which has same column with target table , will throw a exception that target table has different column number with the inserted data. Scene 2 Create a external non-partition table, which location path is empty. After several times insert operation, we add a directory named with "part=1" in the table location directory. Then do insert and select operation, we will find the scan path is changed to "tablePath/part=1",so that we will get a wrong result. It seems that the existing logic of spark will process this kind of table like a partition table. But when we do show partitions operation, it will throw the exception that the table is not partitioned, which is confusing。We believe that the normal logic should be that if a table is a non-partitioned table, the folder under tablePath should not change its basic properties. > Select result is incorrect when add a directory named with k=v to the table > path of external non-partition table > > > Key: SPARK-26407 > URL: https://issues.apache.org/jira/browse/SPARK-26407 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.2, 2.4.0 >Reporter: Bao Yunz >Priority: Major > Labels: usability > > Scene 1 > Create a external non-partition table, in which location directory has a > directory named with "part=1", for example. Then desc the table, we will find > the string "part" is showed in table column. when insert the table with data > which has same column with target table , will throw a exception that target > table has different column number with the inserted data. > Scene 2 > Create a external non-partition table, which location path is empty. After > several times insert operation, we add a directory named with "part=1" in the > table location directory. Then do insert and select operation, we will find > the scan path is changed to "tablePath/part=1",so that we will get a wrong > result. > > It seems that the existing logic of spark will process this kind of table > like a partition table. But when we do show partitions operation, it will > throw the exception that the table is not partitioned, which is confusing。We > believe that the normal logic should be that if a table is a non-partitioned > table, the folder under tablePath should not change its basic properties. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26407) Select result is incorrect when add a directory named with k=v to the table path of external non-partition table
Bao Yunz created SPARK-26407: Summary: Select result is incorrect when add a directory named with k=v to the table path of external non-partition table Key: SPARK-26407 URL: https://issues.apache.org/jira/browse/SPARK-26407 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0, 2.3.2 Reporter: Bao Yunz Scene 1 Create a external non-partition table, in which location path has a directory named with "part=1", for example. Then desc the table, we will find the string "part" is showed in table column. when insert the table with data which has same column with target table , will throw a exception that target table has different column number with the inserted data. Scene 2 Create a external non-partition table, which location path is empty. After several times insert operation, we add a directory named with "part=1" in the table location directory. Then do insert and select operation, we will find the scan path is changed to "tablePath/part=1",so that we will get a wrong result. It seems that the existing logic of spark will process this kind of table like a partition table. But when we do show partitions operation, it will throw the exception that the table is not partitioned, which is confusing。We believe that the normal logic should be that if a table is a non-partitioned table, the folder under tablePath should not change its basic properties. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26406) Add option to skip rows when reading csv files
[ https://issues.apache.org/jira/browse/SPARK-26406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Kastl updated SPARK-26406: - Description: Real-world data can contain multiple header lines. Spark currently does not offer any way to skip more than one header row. Several workarounds are proposed on stackoverflow (manually editing each csv file by adding "#" to the rows and using the comment option, or filtering after reading) but all of them are workarounds with more or less obvious drawbacks and restrictions. The option {code:java} header=True{code} already treats the first row of csv files differently, so the argument that Spark wants to be row-order agnostic does not really hold here in my opinion. A solution like pandas' {code:java} skiprows={code} would be highly preferable. was: Real-world data can contain multiple header lines. Spark currently does not offer any way to skip more than one header row. Several workarounds are proposed on stackoverflow (manually editing each csv file by adding "#" to the rows and using the comment option, or filtering after reading) but all of them are workarounds with more or less obvious drawbacks and restrictions. The option {code:java} header=True{code} already treats the first row of csv files differently, so the argument that Spark wants to be row-order agnostic does not really hold here in my opinion. A solution like pandas {code:java} skiprows={code} would be highly preferable. > Add option to skip rows when reading csv files > -- > > Key: SPARK-26406 > URL: https://issues.apache.org/jira/browse/SPARK-26406 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Thomas Kastl >Priority: Minor > > Real-world data can contain multiple header lines. Spark currently does not > offer any way to skip more than one header row. > Several workarounds are proposed on stackoverflow (manually editing each csv > file by adding "#" to the rows and using the comment option, or filtering > after reading) but all of them are workarounds with more or less obvious > drawbacks and restrictions. > The option > {code:java} > header=True{code} > already treats the first row of csv files differently, so the argument that > Spark wants to be row-order agnostic does not really hold here in my opinion. > A solution like pandas' > {code:java} > skiprows={code} > would be highly preferable. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26406) Add option to skip rows when reading csv files
[ https://issues.apache.org/jira/browse/SPARK-26406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Kastl updated SPARK-26406: - Description: Real-world data can contain multiple header lines. Spark currently does not offer any way to skip more than one header row. Several workarounds are proposed on stackoverflow (manually editing each csv file by adding "#" to the rows and using the comment option, or filtering after reading) but all of them are workarounds with more or less obvious drawbacks and restrictions. The option {code:java} header=True{code} already treats the first row of csv files differently, so the argument that Spark wants to be row-order agnostic does not really hold here in my opinion. A solution like pandas {code:java} skiprows={code} would be highly preferable. was: Real-world data can contain multiple header lines. Spark currently does not offer any way to skip more than one header row. Several workarounds are proposed on stackoverflow (manually editing each csv file by adding "#" to the rows and using the comment option, or filtering after reading) but all of them are workarounds with more or less obvious drawbacks and restrictions. The option {code:java} header=True{code} already treats the first row of csv files differently, so the argument that Spark wants to be row-agnostic does not really hold here in my opinion. A solution like pandas {code:java} skiprows={code} would be highly preferable. > Add option to skip rows when reading csv files > -- > > Key: SPARK-26406 > URL: https://issues.apache.org/jira/browse/SPARK-26406 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Thomas Kastl >Priority: Minor > > Real-world data can contain multiple header lines. Spark currently does not > offer any way to skip more than one header row. > Several workarounds are proposed on stackoverflow (manually editing each csv > file by adding "#" to the rows and using the comment option, or filtering > after reading) but all of them are workarounds with more or less obvious > drawbacks and restrictions. > The option > {code:java} > header=True{code} > already treats the first row of csv files differently, so the argument that > Spark wants to be row-order agnostic does not really hold here in my opinion. > A solution like pandas > {code:java} > skiprows={code} > would be highly preferable. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26406) Add option to skip rows when reading csv files
Thomas Kastl created SPARK-26406: Summary: Add option to skip rows when reading csv files Key: SPARK-26406 URL: https://issues.apache.org/jira/browse/SPARK-26406 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.0 Reporter: Thomas Kastl Real-world data can contain multiple header lines. Spark currently does not offer any way to skip more than one header row. Several workarounds are proposed on stackoverflow (manually editing each csv file by adding "#" to the rows and using the comment option, or filtering after reading) but all of them are workarounds with more or less obvious drawbacks and restrictions. The option {code:java} header=True{code} already treats the first row of csv files differently, so the argument that Spark wants to be row-agnostic does not really hold here in my opinion. A solution like pandas {code:java} skiprows={code} would be highly preferable. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26366) Except with transform regression
[ https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724868#comment-16724868 ] ASF GitHub Bot commented on SPARK-26366: mgaido91 opened a new pull request #23350: [SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should consider NULL as False URL: https://github.com/apache/spark/pull/23350 ## What changes were proposed in this pull request? In `ReplaceExceptWithFilter` we do not consider properly the case in which the condition returns NULL. Indeed, in that case, since negating NULL still returns NULL, so it is not true the assumption that negating the condition returns all the rows which didn't satisfy it, rows returning NULL may not be returned. This happens when constraints inferred by `InferFiltersFromConstraints` are not enough, as it happens with `OR` conditions. The rule had also problems with non-deterministic conditions: in such a scenario, this rule would change the probability of the output. The PR fixes these problem by: - returning False for the condition when it is Null (in this way we do return all the rows which didn't satisfy it); - avoiding any transformation when the condition is non-deterministic. ## How was this patch tested? added UTs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Except with transform regression > > > Key: SPARK-26366 > URL: https://issues.apache.org/jira/browse/SPARK-26366 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.2 >Reporter: Dan Osipov >Assignee: Marco Gaido >Priority: Major > Fix For: 2.4.1, 3.0.0 > > > There appears to be a regression between Spark 2.2 and 2.3. Below is the code > to reproduce it: > > {code:java} > import org.apache.spark.sql.functions.col > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > val inputDF = spark.sqlContext.createDataFrame( > spark.sparkContext.parallelize(Seq( > Row("0", "john", "smith", "j...@smith.com"), > Row("1", "jane", "doe", "j...@doe.com"), > Row("2", "apache", "spark", "sp...@apache.org"), > Row("3", "foo", "bar", null) > )), > StructType(List( > StructField("id", StringType, nullable=true), > StructField("first_name", StringType, nullable=true), > StructField("last_name", StringType, nullable=true), > StructField("email", StringType, nullable=true) > )) > ) > val exceptDF = inputDF.transform( toProcessDF => > toProcessDF.filter( > ( > col("first_name").isin(Seq("john", "jane"): _*) > and col("last_name").isin(Seq("smith", "doe"): _*) > ) > or col("email").isin(List(): _*) > ) > ) > inputDF.except(exceptDF).show() > {code} > Output with Spark 2.2: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > | 3| foo| bar| null| > +---+--+-++{noformat} > Output with Spark 2.3: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > +---+--+-++{noformat} > Note, changing the last line to > {code:java} > inputDF.except(exceptDF.cache()).show() > {code} > produces identical output for both Spark 2.3 and 2.2 > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26405) OOM
lu created SPARK-26405: -- Summary: OOM Key: SPARK-26405 URL: https://issues.apache.org/jira/browse/SPARK-26405 Project: Spark Issue Type: Bug Components: Java API, Scheduler, Shuffle, Spark Core, Spark Submit Affects Versions: 2.2.0 Reporter: lu Heap memory overflow occurred in the user portrait analysis, and the data volume analyzed was about 10 million records spark work memory:4G using RestSubmissionClient to submit the job boht the driver memory and executor memory :4g total executor cores: 6 spark cores:2 the cluster size:3 INFO worker.WorkerWatcher: Connecting to worker spark://Worker@192.168.44.181:45315 Exception in thread "broadcast-exchange-3" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:102) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73) at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:103) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123) at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:248) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:126) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155) at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:36) at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:68) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155) at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:88) at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:209) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155) at org.apache.spark.sql.execution.SerializeFromObjectExec.consume(objects.scala:107) at org.apache.spark.sql.execution.SerializeFromObjectExec.doConsume(objects.scala:129) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155) at
[jira] [Commented] (SPARK-26403) DataFrame pivot using array column fails with "Unsupported literal type class"
[ https://issues.apache.org/jira/browse/SPARK-26403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724839#comment-16724839 ] ASF GitHub Bot commented on SPARK-26403: HyukjinKwon opened a new pull request #23349: [SPARK-26403][SQL] Support pivoting using array column for `pivot(column)` API URL: https://github.com/apache/spark/pull/23349 ## What changes were proposed in this pull request? This PR fixes `Literal(..: Any)` can accepts `collection.mutable.WrappedArray` in order to `pivot(Column)` can accepts array column as well. We can unwrap the array and use it for type dispatch. ```scala val df = Seq( (2, Seq.empty[String]), (2, Seq("a", "x")), (3, Seq.empty[String]), (3, Seq("a", "x"))).toDF("x", "s") df.groupBy("x").pivot("s").count().show() ``` Before: ``` Unsupported literal type class scala.collection.mutable.WrappedArray$ofRef WrappedArray() java.lang.RuntimeException: Unsupported literal type class scala.collection.mutable.WrappedArray$ofRef WrappedArray() at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:80) at org.apache.spark.sql.RelationalGroupedDataset.$anonfun$pivot$2(RelationalGroupedDataset.scala:427) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:39) at scala.collection.TraversableLike.map(TraversableLike.scala:237) at scala.collection.TraversableLike.map$(TraversableLike.scala:230) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:425) at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:406) at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:317) at org.apache.spark.sql.DataFramePivotSuite.$anonfun$new$1(DataFramePivotSuite.scala:341) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ``` After: ``` +---+---+--+ | x| []|[a, x]| +---+---+--+ | 3| 1| 1| | 2| 1| 1| +---+---+--+ ``` ## How was this patch tested? Manually tested and unittests were added. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > DataFrame pivot using array column fails with "Unsupported literal type class" > -- > > Key: SPARK-26403 > URL: https://issues.apache.org/jira/browse/SPARK-26403 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Huon Wilson >Priority: Minor > > Doing a pivot (using the {{pivot(pivotColumn: Column)}} overload) on a column > containing arrays results in a runtime error: > {code:none} > scala> val df = Seq((1, Seq("a", "x"), 2), (1, Seq("b"), 3), (2, Seq("a", > "x"), 10), (3, Seq(), 100)).toDF("x", "s", "y") > df: org.apache.spark.sql.DataFrame = [x: int, s: array ... 1 more > field] > scala> df.show > +---+--+---+ > | x| s| y| > +---+--+---+ > | 1|[a, x]| 2| > | 1| [b]| 3| > | 2|[a, x]| 10| > | 3|[]|100| > +---+--+---+ > scala> df.groupBy("x").pivot("s").agg(collect_list($"y")).show > java.lang.RuntimeException: Unsupported literal type class > scala.collection.mutable.WrappedArray$ofRef WrappedArray() > at > org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78) > at > org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419) > at > org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at >
[jira] [Assigned] (SPARK-26403) DataFrame pivot using array column fails with "Unsupported literal type class"
[ https://issues.apache.org/jira/browse/SPARK-26403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26403: Assignee: Apache Spark > DataFrame pivot using array column fails with "Unsupported literal type class" > -- > > Key: SPARK-26403 > URL: https://issues.apache.org/jira/browse/SPARK-26403 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Huon Wilson >Assignee: Apache Spark >Priority: Minor > > Doing a pivot (using the {{pivot(pivotColumn: Column)}} overload) on a column > containing arrays results in a runtime error: > {code:none} > scala> val df = Seq((1, Seq("a", "x"), 2), (1, Seq("b"), 3), (2, Seq("a", > "x"), 10), (3, Seq(), 100)).toDF("x", "s", "y") > df: org.apache.spark.sql.DataFrame = [x: int, s: array ... 1 more > field] > scala> df.show > +---+--+---+ > | x| s| y| > +---+--+---+ > | 1|[a, x]| 2| > | 1| [b]| 3| > | 2|[a, x]| 10| > | 3|[]|100| > +---+--+---+ > scala> df.groupBy("x").pivot("s").agg(collect_list($"y")).show > java.lang.RuntimeException: Unsupported literal type class > scala.collection.mutable.WrappedArray$ofRef WrappedArray() > at > org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78) > at > org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419) > at > org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:419) > at > org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:397) > at > org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:317) > ... 49 elided > {code} > However, this doesn't seem to be a fundamental limitation with {{pivot}}, as > it works fine using the {{pivot(pivotColumn: Column, values: Seq[Any])}} > overload, as long as the arrays are mapped to the {{Array}} type: > {code:none} > scala> val rawValues = df.select("s").distinct.sort("s").collect > rawValues: Array[org.apache.spark.sql.Row] = Array([WrappedArray()], > [WrappedArray(a, x)], [WrappedArray(b)]) > scala> val values = rawValues.map(_.getSeq[String](0).to[Array]) > values: Array[Array[String]] = Array(Array(), Array(a, x), Array(b)) > scala> df.groupBy("x").pivot("s", values).agg(collect_list($"y")).show > +---+-+--+---+ > | x| []|[a, x]|[b]| > +---+-+--+---+ > | 1| []| [2]|[3]| > | 3|[100]|[]| []| > | 2| []| [10]| []| > +---+-+--+---+ > {code} > It would be nice if {{pivot}} was more resilient to Spark's own > representation of array columns, and so the first version worked. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26403) DataFrame pivot using array column fails with "Unsupported literal type class"
[ https://issues.apache.org/jira/browse/SPARK-26403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26403: Assignee: (was: Apache Spark) > DataFrame pivot using array column fails with "Unsupported literal type class" > -- > > Key: SPARK-26403 > URL: https://issues.apache.org/jira/browse/SPARK-26403 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Huon Wilson >Priority: Minor > > Doing a pivot (using the {{pivot(pivotColumn: Column)}} overload) on a column > containing arrays results in a runtime error: > {code:none} > scala> val df = Seq((1, Seq("a", "x"), 2), (1, Seq("b"), 3), (2, Seq("a", > "x"), 10), (3, Seq(), 100)).toDF("x", "s", "y") > df: org.apache.spark.sql.DataFrame = [x: int, s: array ... 1 more > field] > scala> df.show > +---+--+---+ > | x| s| y| > +---+--+---+ > | 1|[a, x]| 2| > | 1| [b]| 3| > | 2|[a, x]| 10| > | 3|[]|100| > +---+--+---+ > scala> df.groupBy("x").pivot("s").agg(collect_list($"y")).show > java.lang.RuntimeException: Unsupported literal type class > scala.collection.mutable.WrappedArray$ofRef WrappedArray() > at > org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78) > at > org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419) > at > org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:419) > at > org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:397) > at > org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:317) > ... 49 elided > {code} > However, this doesn't seem to be a fundamental limitation with {{pivot}}, as > it works fine using the {{pivot(pivotColumn: Column, values: Seq[Any])}} > overload, as long as the arrays are mapped to the {{Array}} type: > {code:none} > scala> val rawValues = df.select("s").distinct.sort("s").collect > rawValues: Array[org.apache.spark.sql.Row] = Array([WrappedArray()], > [WrappedArray(a, x)], [WrappedArray(b)]) > scala> val values = rawValues.map(_.getSeq[String](0).to[Array]) > values: Array[Array[String]] = Array(Array(), Array(a, x), Array(b)) > scala> df.groupBy("x").pivot("s", values).agg(collect_list($"y")).show > +---+-+--+---+ > | x| []|[a, x]|[b]| > +---+-+--+---+ > | 1| []| [2]|[3]| > | 3|[100]|[]| []| > | 2| []| [10]| []| > +---+-+--+---+ > {code} > It would be nice if {{pivot}} was more resilient to Spark's own > representation of array columns, and so the first version worked. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26375) Rule PruneFileSourcePartitions should be fired before any other rules based on table statistics
[ https://issues.apache.org/jira/browse/SPARK-26375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724827#comment-16724827 ] Wang, Gang commented on SPARK-26375: Should be okay, filter on partition columns is also regarded as a normal filter, and the output stats is measured in class FilterEstimation. > Rule PruneFileSourcePartitions should be fired before any other rules based > on table statistics > --- > > Key: SPARK-26375 > URL: https://issues.apache.org/jira/browse/SPARK-26375 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wang, Gang >Priority: Major > > In catalyst, some optimize rules are base on table statistics, like rule > ReorderJoin, in which star schema is detected, and CostBasedJoinReorder. In > these rules, statistics accuracy are crucial. While, currently all these > rules are fired before partition pruning, which may result in inaccurate > statistics. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26375) Rule PruneFileSourcePartitions should be fired before any other rules based on table statistics
[ https://issues.apache.org/jira/browse/SPARK-26375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang resolved SPARK-26375. Resolution: Not A Problem > Rule PruneFileSourcePartitions should be fired before any other rules based > on table statistics > --- > > Key: SPARK-26375 > URL: https://issues.apache.org/jira/browse/SPARK-26375 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wang, Gang >Priority: Major > > In catalyst, some optimize rules are base on table statistics, like rule > ReorderJoin, in which star schema is detected, and CostBasedJoinReorder. In > these rules, statistics accuracy are crucial. While, currently all these > rules are fired before partition pruning, which may result in inaccurate > statistics. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org