[jira] [Commented] (SPARK-23291) SparkR : substr : In SparkR dataframe , starting and ending position arguments in "substr" is giving wrong result when the position is greater than 1

2018-03-29 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16418599#comment-16418599
 ] 

Liang-Chi Hsieh commented on SPARK-23291:
-

Because it is related to behavior change, I'm hesitant to make a backport to 
2.3 right now.

> SparkR : substr : In SparkR dataframe , starting and ending position 
> arguments in "substr" is giving wrong result  when the position is greater 
> than 1
> --
>
> Key: SPARK-23291
> URL: https://issues.apache.org/jira/browse/SPARK-23291
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.1.2, 2.2.0, 2.2.1, 2.3.0
>Reporter: Narendra
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.4.0
>
>
> Defect Description :
> -
> For example ,an input string "2017-12-01" is read into a SparkR dataframe 
> "df" with column name "col1".
>  The target is to create a a new column named "col2" with the value "12" 
> which is inside the string ."12" can be extracted with "starting position" as 
> "6" and "Ending position" as "7"
>  (the starting position of the first character is considered as "1" )
> But,the current code that needs to be written is :
>  
>  df <- withColumn(df,"col2",substr(df$col1,7,8)))
> Observe that the first argument in the "substr" API , which indicates the 
> 'starting position', is mentioned as "7" 
>  Also, observe that the second argument in the "substr" API , which indicates 
> the 'ending position', is mentioned as "8"
> i.e the number that should be mentioned to indicate the position should be 
> the "actual position + 1"
> Expected behavior :
> 
> The code that needs to be written is :
>  
>  df <- withColumn(df,"col2",substr(df$col1,6,7)))
> Note :
> ---
>  This defect is observed with only when the starting position is greater than 
> 1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23784) Cannot use custom Aggregator with groupBy/agg

2018-03-29 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16418558#comment-16418558
 ] 

Liang-Chi Hsieh commented on SPARK-23784:
-

I think your question is already replied on stackoverflow.

> Cannot use custom Aggregator with groupBy/agg 
> --
>
> Key: SPARK-23784
> URL: https://issues.apache.org/jira/browse/SPARK-23784
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Joshua Howard
>Priority: Major
>
> I have code 
> [here|http://stackoverflow.com/questions/49440766/trouble-getting-spark-aggregators-to-work]
>  where I am trying to use an Aggregator with both the select and agg 
> functions. I cannot seem to get this to work in Spark 2.3.0. 
> [Here|https://docs.cloud.databricks.com/docs/spark/1.6/examples/Dataset%20Aggregator.html]
>  is a blog post that appears to be using this functionality in Spark 1.6, but 
> It appears to no longer work. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23784) Cannot use custom Aggregator with groupBy/agg

2018-03-29 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh resolved SPARK-23784.
-
Resolution: Not A Problem

> Cannot use custom Aggregator with groupBy/agg 
> --
>
> Key: SPARK-23784
> URL: https://issues.apache.org/jira/browse/SPARK-23784
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Joshua Howard
>Priority: Major
>
> I have code 
> [here|http://stackoverflow.com/questions/49440766/trouble-getting-spark-aggregators-to-work]
>  where I am trying to use an Aggregator with both the select and agg 
> functions. I cannot seem to get this to work in Spark 2.3.0. 
> [Here|https://docs.cloud.databricks.com/docs/spark/1.6/examples/Dataset%20Aggregator.html]
>  is a blog post that appears to be using this functionality in Spark 1.6, but 
> It appears to no longer work. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23734) InvalidSchemaException While Saving ALSModel

2018-03-23 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410914#comment-16410914
 ] 

Liang-Chi Hsieh commented on SPARK-23734:
-

I use the latest master branch and can't reproduce the reported issue.

> InvalidSchemaException While Saving ALSModel
> 
>
> Key: SPARK-23734
> URL: https://issues.apache.org/jira/browse/SPARK-23734
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.3.0
> Environment: macOS 10.13.2
> Scala 2.11.8
> Spark 2.3.0
>Reporter: Stanley Poon
>Priority: Major
>  Labels: ALS, parquet, persistence
>
> After fitting an ALSModel, get following error while saving the model:
> Caused by: org.apache.parquet.schema.InvalidSchemaException: A group type can 
> not be empty. Parquet does not support empty group without leaves. Empty 
> group: spark_schema
> Exactly the same code ran ok on 2.2.1.
> Same issue also occurs on other ALSModels we have.
> h2. *To reproduce*
> Get ALSExample: 
> [https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala]
>  and add the following line to save the model right before "spark.stop".
> {quote}   model.write.overwrite().save("SparkExampleALSModel") 
> {quote}
> h2. Stack Trace
> Exception in thread "main" java.lang.ExceptionInInitializerError
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$anonfun$setSchema$2.apply(ParquetWriteSupport.scala:444)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$anonfun$setSchema$2.apply(ParquetWriteSupport.scala:444)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$.setSchema(ParquetWriteSupport.scala:444)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.prepareWrite(ParquetFileFormat.scala:112)
> at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:140)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
> at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
> at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
> at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
> at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
> at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
> at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
> at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
> at 
> org.apache.spark.ml.recommendation.ALSModel$ALSModelWriter.saveImpl(ALS.scala:510)
> at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:103)
> at com.vitalmove.model.ALSExample$.main(ALSExample.scala:83)
> at com.vitalmove.model.ALSExample.main(ALSExample.scala)
> Caused by: org.apache.parquet.schema.InvalidSchemaException: A group type can 
> not be empty. Parquet does not support empty group without leaves. Empty 
> group: spark_schema
> at org.apache.parquet.schema.GroupType.(GroupType.java:92)
> at org.apache.parquet.schema.GroupType.(GroupType.java:48)
> at org.apache.parquet.schema.MessageType.(MessageType.java:50)
> at org.apache.parquet.schema.Types$MessageTypeBuilder.named(Types.java:1256)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.(ParquetSchemaConverter.scala:567)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.(ParquetSchemaConverter.scala)
>  



--
This

[jira] [Updated] (SPARK-23614) Union produces incorrect results when caching is used

2018-03-14 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-23614:

Component/s: (was: Spark Core)
 SQL

> Union produces incorrect results when caching is used
> -
>
> Key: SPARK-23614
> URL: https://issues.apache.org/jira/browse/SPARK-23614
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Morten Hornbech
>Priority: Major
>
> We just upgraded from 2.2 to 2.3 and our test suite caught this error:
> {code:java}
> case class TestData(x: Int, y: Int, z: Int)
> val frame = session.createDataset(Seq(TestData(1, 2, 3), TestData(4, 5, 
> 6))).cache()
> val group1 = frame.groupBy("x").agg(min(col("y")) as "value")
> val group2 = frame.groupBy("x").agg(min(col("z")) as "value")
> group1.union(group2).show()
> // +---+-+
> // | x|value|
> // +---+-+
> // | 1| 2|
> // | 4| 5|
> // | 1| 2|
> // | 4| 5|
> // +---+-+
> group2.union(group1).show()
> // +---+-+
> // | x|value|
> // +---+-+
> // | 1| 3|
> // | 4| 6|
> // | 1| 3|
> // | 4| 6|
> // +---+-+
> {code}
> The error disappears if the first data frame is not cached or if the two 
> group by's use separate copies. I'm not sure exactly what happens on the 
> insides of Spark, but errors that produce incorrect results rather than 
> exceptions always concerns me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23661) Implement treeAggregate on Dataset API

2018-03-12 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-23661:
---

 Summary: Implement treeAggregate on Dataset API
 Key: SPARK-23661
 URL: https://issues.apache.org/jira/browse/SPARK-23661
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


Many algorithms in MLlib are still not migrated their internal computing 
workload from {{RDD}} to {{DataFrame}}. {{treeAggregate}} is one of obstacles 
we need to address in order to see complete migration.

This ticket is opened to provide {{treeAggregate}} on Dataset API. For now this 
should be a private API used by ML component.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2018-03-05 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387191#comment-16387191
 ] 

Liang-Chi Hsieh commented on SPARK-22446:
-

Yeah, sounds good.

> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.2, 2.1.2, 2.2.1
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.3.0
>
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>   

[jira] [Commented] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2018-03-03 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384618#comment-16384618
 ] 

Liang-Chi Hsieh commented on SPARK-22446:
-

This fix uses an new API  {{asNondeterministic}} of {{UserDefinedFunction. 
}}{{asNondeterministic}} is added since 2.3.0. If we want to backport this fix, 
we need to backport the API too. It is not hard but it involves SQL codes. 
Should we backport it because of this fix?

> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.2, 2.1.2, 2.2.1
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.3.0
>
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iter

[jira] [Commented] (SPARK-23471) RandomForestClassificationModel save() - incorrect metadata

2018-02-27 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16379516#comment-16379516
 ] 

Liang-Chi Hsieh commented on SPARK-23471:
-

I can't reproduce this. With `fit`, the params are copied and saved correctly.

> RandomForestClassificationModel save() - incorrect metadata
> ---
>
> Key: SPARK-23471
> URL: https://issues.apache.org/jira/browse/SPARK-23471
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.2.1
>Reporter: Keepun
>Priority: Major
>
> RandomForestClassificationMode.load() does not work after save() 
> {code:java}
> RandomForestClassifier rf = new RandomForestClassifier()
> .setFeaturesCol("features")
> .setLabelCol("result")
> .setNumTrees(100)
> .setMaxDepth(30)
> .setMinInstancesPerNode(1)
> //.setCacheNodeIds(true)
> .setMaxMemoryInMB(500)
> .setSeed(System.currentTimeMillis() + System.nanoTime());
> RandomForestClassificationModel rfmodel = rf.train(data);
>try {
>   rfmodel.save(args[2] + "." + System.currentTimeMillis());
>} catch (IOException e) {
>   LOG.error(e.getMessage(), e);
>   e.printStackTrace();
>}
> {code}
> File metadata\part-0: 
> {code:java}
> {"class":"org.apache.spark.ml.classification.RandomForestClassificationModel",
> "timestamp":1519136783983,"sparkVersion":"2.2.1","uid":"rfc_7c7e84ce7488",
> "paramMap":{"featureSubsetStrategy":"auto","cacheNodeIds":false,"impurity":"gini",
> "checkpointInterval":10,
> "numTrees":20,"maxDepth":5,
> "probabilityCol":"probability","labelCol":"label","featuresCol":"features",
> "maxMemoryInMB":256,"minInstancesPerNode":1,"subsamplingRate":1.0,
> "rawPredictionCol":"rawPrediction","predictionCol":"prediction","maxBins":32,
> "minInfoGain":0.0,"seed":-491520797},"numFeatures":1354,"numClasses":2,
> "numTrees":20}
> {code}
> should be:
> {code:java}
> "numTrees":100,"maxDepth":30,{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23448) Dataframe returns wrong result when column don't respect datatype

2018-02-24 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16375439#comment-16375439
 ] 

Liang-Chi Hsieh commented on SPARK-23448:
-

In fact this is exactly the JSON parser's behavior, not a bug. We don't allow 
partial result for corrupted records. Except for the field configured by 
{{columnNameOfCorruptRecord}}, all fields will be set to {{null}}.

> Dataframe returns wrong result when column don't respect datatype
> -
>
> Key: SPARK-23448
> URL: https://issues.apache.org/jira/browse/SPARK-23448
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
> Environment: Local
>Reporter: Ahmed ZAROUI
>Priority: Major
>
> I have the following json file that contains some noisy data(String instead 
> of Array):
>  
> {code:java}
> {"attr1":"val1","attr2":"[\"val2\"]"}
> {"attr1":"val1","attr2":["val2"]}
> {code}
> And i need to specify schema programatically like this:
>  
> {code:java}
> implicit val spark = SparkSession
>   .builder()
>   .master("local[*]")
>   .config("spark.ui.enabled", false)
>   .config("spark.sql.caseSensitive", "True")
>   .getOrCreate()
> import spark.implicits._
> val schema = StructType(
>   Seq(StructField("attr1", StringType, true),
>   StructField("attr2", ArrayType(StringType, true), true)))
> spark.read.schema(schema).json(input).collect().foreach(println)
> {code}
> The result given by this code is:
> {code:java}
> [null,null]
> [val1,WrappedArray(val2)]
> {code}
> Instead of putting null in corrupted column, all columns of the first message 
> are null
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23390) Flaky Test Suite: FileBasedDataSourceSuite in Spark 2.3/hadoop 2.7

2018-02-21 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372360#comment-16372360
 ] 

Liang-Chi Hsieh commented on SPARK-23390:
-

{{FileBasedDataSourceSuite}} seems still flaky.

 

[https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87603/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/]

 

[https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87600/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/]

 

> Flaky Test Suite: FileBasedDataSourceSuite in Spark 2.3/hadoop 2.7
> --
>
> Key: SPARK-23390
> URL: https://issues.apache.org/jira/browse/SPARK-23390
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Sameer Agarwal
>Assignee: Wenchen Fan
>Priority: Major
>
> We're seeing multiple failures in {{FileBasedDataSourceSuite}} in 
> {{spark-branch-2.3-test-sbt-hadoop-2.7}}:
> {code}
> org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to 
> eventually never returned normally. Attempted 15 times over 
> 10.01215805999 seconds. Last failure message: There are 1 possibly leaked 
> file streams..
> {code}
> Here's the full history: 
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/189/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/history/
> From a very quick look, these failures seem to be correlated with 
> https://github.com/apache/spark/pull/20479 (cc [~dongjoon]) as evident from 
> the following stack trace (full logs 
> [here|https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/189/console]):
>  
> {code}
> [info] - Enabling/disabling ignoreMissingFiles using orc (648 milliseconds)
> 15:55:58.673 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in 
> stage 61.0 (TID 85, localhost, executor driver): TaskKilled (Stage cancelled)
> 15:55:58.674 WARN org.apache.spark.DebugFilesystem: Leaked filesystem 
> connection created at:
> java.lang.Throwable
>   at 
> org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
>   at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
>   at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>   at 
> org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.open(RecordReaderUtils.java:173)
>   at 
> org.apache.orc.impl.RecordReaderImpl.(RecordReaderImpl.java:254)
>   at org.apache.orc.impl.ReaderImpl.rows(ReaderImpl.java:633)
>   at 
> org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.initialize(OrcColumnarBatchReader.java:138)
> {code}
> Also, while this might be just a false correlation but the frequency of these 
> test failures have increased considerably in 
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/
>  after https://github.com/apache/spark/pull/20562 (cc 
> [~feng...@databricks.com]) was merged.
> The following is Parquet leakage.
> {code}
> Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null
>   at 
> org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
>   at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
>   at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>   at 
> org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:538)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:356)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:125)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:179)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To u

[jira] [Updated] (SPARK-23448) Dataframe returns wrong result when column don't respect datatype

2018-02-20 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-23448:

Component/s: (was: Spark Core)
 SQL

> Dataframe returns wrong result when column don't respect datatype
> -
>
> Key: SPARK-23448
> URL: https://issues.apache.org/jira/browse/SPARK-23448
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
> Environment: Local
>Reporter: Ahmed ZAROUI
>Priority: Major
>
> I have the following json file that contains some noisy data(String instead 
> of Array):
>  
> {code:java}
> {"attr1":"val1","attr2":"[\"val2\"]"}
> {"attr1":"val1","attr2":["val2"]}
> {code}
> And i need to specify schema programatically like this:
>  
> {code:java}
> implicit val spark = SparkSession
>   .builder()
>   .master("local[*]")
>   .config("spark.ui.enabled", false)
>   .config("spark.sql.caseSensitive", "True")
>   .getOrCreate()
> import spark.implicits._
> val schema = StructType(
>   Seq(StructField("attr1", StringType, true),
>   StructField("attr2", ArrayType(StringType, true), true)))
> spark.read.schema(schema).json(input).collect().foreach(println)
> {code}
> The result given by this code is:
> {code:java}
> [null,null]
> [val1,WrappedArray(val2)]
> {code}
> Instead of putting null in corrupted column, all columns of the first message 
> are null
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23455) Default Params in ML should be saved separately

2018-02-16 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368116#comment-16368116
 ] 

Liang-Chi Hsieh commented on SPARK-23455:
-

Currently, {{DefaultParamsWriter}} saves the following metadata + params:

   *  - class
   *  - timestamp
   *  - sparkVersion
   *  - uid
   *  - paramMap
   *  - (optionally, extra metadata)

User-supplied params and default params are all saved in {{paramMap}} field in 
JSON. We can have a {{defaultParamMap}} for saving default params.

For backward compatibility, when loading metadata, if it is a metadata file 
prior to Spark 2.4, we shouldn't raise error if we can't find 
{{defaultParamMap}} field in the file.

 

 

> Default Params in ML should be saved separately
> ---
>
> Key: SPARK-23455
> URL: https://issues.apache.org/jira/browse/SPARK-23455
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.4.0
>Reporter: Liang-Chi Hsieh
>Priority: Major
>
> We save ML's user-supplied params and default params as one entity in JSON. 
> During loading the saved models, we set all the loaded params into created ML 
> model instances as user-supplied params.
> It causes some problems, e.g., if we strictly disallow some params to be set 
> at the same time, a default param can fail the param check because it is 
> treated as user-supplied param after loading.
> The loaded default params should not be set as user-supplied params. We 
> should save ML default params separately in JSON.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23455) Default Params in ML should be saved separately

2018-02-16 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-23455:
---

 Summary: Default Params in ML should be saved separately
 Key: SPARK-23455
 URL: https://issues.apache.org/jira/browse/SPARK-23455
 Project: Spark
  Issue Type: Improvement
  Components: ML
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


We save ML's user-supplied params and default params as one entity in JSON. 
During loading the saved models, we set all the loaded params into created ML 
model instances as user-supplied params.

It causes some problems, e.g., if we strictly disallow some params to be set at 
the same time, a default param can fail the param check because it is treated 
as user-supplied param after loading.

The loaded default params should not be set as user-supplied params. We should 
save ML default params separately in JSON.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23377) Bucketizer with multiple columns persistence bug

2018-02-13 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16363367#comment-16363367
 ] 

Liang-Chi Hsieh commented on SPARK-23377:
-

I agree with what [~mlnick] said.

> Bucketizer with multiple columns persistence bug
> 
>
> Key: SPARK-23377
> URL: https://issues.apache.org/jira/browse/SPARK-23377
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Bago Amirbekian
>Priority: Critical
>
> A Bucketizer with multiple input/output columns get "inputCol" set to the 
> default value on write -> read which causes it to throw an error on 
> transform. Here's an example.
> {code:java}
> import org.apache.spark.ml.feature._
> val splits = Array(Double.NegativeInfinity, 0, 10, 100, 
> Double.PositiveInfinity)
> val bucketizer = new Bucketizer()
>   .setSplitsArray(Array(splits, splits))
>   .setInputCols(Array("foo1", "foo2"))
>   .setOutputCols(Array("bar1", "bar2"))
> val data = Seq((1.0, 2.0), (10.0, 100.0), (101.0, -1.0)).toDF("foo1", "foo2")
> bucketizer.transform(data)
> val path = "/temp/bucketrizer-persist-test"
> bucketizer.write.overwrite.save(path)
> val bucketizerAfterRead = Bucketizer.read.load(path)
> println(bucketizerAfterRead.isDefined(bucketizerAfterRead.outputCol))
> // This line throws an error because "outputCol" is set
> bucketizerAfterRead.transform(data)
> {code}
> And the trace:
> {code:java}
> java.lang.IllegalArgumentException: Bucketizer bucketizer_6f0acc3341f7 has 
> the inputCols Param set for multi-column transform. The following Params are 
> not applicable and should not be set: outputCol.
>   at 
> org.apache.spark.ml.param.ParamValidators$.checkExclusiveParams$1(params.scala:300)
>   at 
> org.apache.spark.ml.param.ParamValidators$.checkSingleVsMultiColumnParams(params.scala:314)
>   at 
> org.apache.spark.ml.feature.Bucketizer.transformSchema(Bucketizer.scala:189)
>   at 
> org.apache.spark.ml.feature.Bucketizer.transform(Bucketizer.scala:141)
>   at 
> line251821108a8a433da484ee31f166c83725.$read$$iw$$iw$$iw$$iw$$iw$$iw.(command-6079631:17)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23403) java.lang.ArrayIndexOutOfBoundsException: 10

2018-02-13 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362030#comment-16362030
 ] 

Liang-Chi Hsieh commented on SPARK-23403:
-

Have you checked the content of the csv file? Is there any line with missing 
field?

> java.lang.ArrayIndexOutOfBoundsException: 10
> 
>
> Key: SPARK-23403
> URL: https://issues.apache.org/jira/browse/SPARK-23403
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell
>Affects Versions: 2.2.0
>Reporter: Naresh Kumar
>Priority: Major
>
> java.lang.ArrayIndexOutOfBoundsException: 10, while retriving records from 
> Dataframe in spark-shell



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23377) Bucketizer with multiple columns persistence bug

2018-02-12 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361724#comment-16361724
 ] 

Liang-Chi Hsieh commented on SPARK-23377:
-

For now, I think neither 3rd option or my current patch can be easily going 
into 2.3 because they are both not small change. So I also support to have the 
fixing of 2nd option first as a quick fix in 2.3. If no objection, I will 
prepare the fixing as a PR soon.

> Bucketizer with multiple columns persistence bug
> 
>
> Key: SPARK-23377
> URL: https://issues.apache.org/jira/browse/SPARK-23377
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Bago Amirbekian
>Priority: Critical
>
> A Bucketizer with multiple input/output columns get "inputCol" set to the 
> default value on write -> read which causes it to throw an error on 
> transform. Here's an example.
> {code:java}
> import org.apache.spark.ml.feature._
> val splits = Array(Double.NegativeInfinity, 0, 10, 100, 
> Double.PositiveInfinity)
> val bucketizer = new Bucketizer()
>   .setSplitsArray(Array(splits, splits))
>   .setInputCols(Array("foo1", "foo2"))
>   .setOutputCols(Array("bar1", "bar2"))
> val data = Seq((1.0, 2.0), (10.0, 100.0), (101.0, -1.0)).toDF("foo1", "foo2")
> bucketizer.transform(data)
> val path = "/temp/bucketrizer-persist-test"
> bucketizer.write.overwrite.save(path)
> val bucketizerAfterRead = Bucketizer.read.load(path)
> println(bucketizerAfterRead.isDefined(bucketizerAfterRead.outputCol))
> // This line throws an error because "outputCol" is set
> bucketizerAfterRead.transform(data)
> {code}
> And the trace:
> {code:java}
> java.lang.IllegalArgumentException: Bucketizer bucketizer_6f0acc3341f7 has 
> the inputCols Param set for multi-column transform. The following Params are 
> not applicable and should not be set: outputCol.
>   at 
> org.apache.spark.ml.param.ParamValidators$.checkExclusiveParams$1(params.scala:300)
>   at 
> org.apache.spark.ml.param.ParamValidators$.checkSingleVsMultiColumnParams(params.scala:314)
>   at 
> org.apache.spark.ml.feature.Bucketizer.transformSchema(Bucketizer.scala:189)
>   at 
> org.apache.spark.ml.feature.Bucketizer.transform(Bucketizer.scala:141)
>   at 
> line251821108a8a433da484ee31f166c83725.$read$$iw$$iw$$iw$$iw$$iw$$iw.(command-6079631:17)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23377) Bucketizer with multiple columns persistence bug

2018-02-12 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361718#comment-16361718
 ] 

Liang-Chi Hsieh commented on SPARK-23377:
-

I have no objection to [~josephkb]'s proposal (first 2nd and later 3rd).

 

The considering design is we should keep the default values of original Spark 
when saving the model, or use the default values of the Spark when loading the 
model. To keep the default values of original Spark, can make the behavior of 
the saved models reproducible. However, I have in mind that the behavior 
between loaded models and models created with current Spark can be different. 
E.g., The model "foo" from 2.1 with default value as "a" can reproducible 
behavior when loading back into 2.3. But it behaves differently with the same 
"foo" model created in 2.3 if the default value is changed to "b".

 

In other words, one is to keep the model behavior consistent before and after 
persistence even across Spark versions. Another one is to let the same kind of 
models has consistent behavior even they are coming from different Spark 
versions.

 

Current my patch follows the later one. I think the user should notice the 
change of default values in upgraded Spark, if they want to use old models. 
Btw, I also think of a rare but possible situation is, if we remove the default 
value from old version, the old models may not be easily loaded into new Spark.

 

 

> Bucketizer with multiple columns persistence bug
> 
>
> Key: SPARK-23377
> URL: https://issues.apache.org/jira/browse/SPARK-23377
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Bago Amirbekian
>Priority: Critical
>
> A Bucketizer with multiple input/output columns get "inputCol" set to the 
> default value on write -> read which causes it to throw an error on 
> transform. Here's an example.
> {code:java}
> import org.apache.spark.ml.feature._
> val splits = Array(Double.NegativeInfinity, 0, 10, 100, 
> Double.PositiveInfinity)
> val bucketizer = new Bucketizer()
>   .setSplitsArray(Array(splits, splits))
>   .setInputCols(Array("foo1", "foo2"))
>   .setOutputCols(Array("bar1", "bar2"))
> val data = Seq((1.0, 2.0), (10.0, 100.0), (101.0, -1.0)).toDF("foo1", "foo2")
> bucketizer.transform(data)
> val path = "/temp/bucketrizer-persist-test"
> bucketizer.write.overwrite.save(path)
> val bucketizerAfterRead = Bucketizer.read.load(path)
> println(bucketizerAfterRead.isDefined(bucketizerAfterRead.outputCol))
> // This line throws an error because "outputCol" is set
> bucketizerAfterRead.transform(data)
> {code}
> And the trace:
> {code:java}
> java.lang.IllegalArgumentException: Bucketizer bucketizer_6f0acc3341f7 has 
> the inputCols Param set for multi-column transform. The following Params are 
> not applicable and should not be set: outputCol.
>   at 
> org.apache.spark.ml.param.ParamValidators$.checkExclusiveParams$1(params.scala:300)
>   at 
> org.apache.spark.ml.param.ParamValidators$.checkSingleVsMultiColumnParams(params.scala:314)
>   at 
> org.apache.spark.ml.feature.Bucketizer.transformSchema(Bucketizer.scala:189)
>   at 
> org.apache.spark.ml.feature.Bucketizer.transform(Bucketizer.scala:141)
>   at 
> line251821108a8a433da484ee31f166c83725.$read$$iw$$iw$$iw$$iw$$iw$$iw.(command-6079631:17)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23333) SparkML VectorAssembler.transform slow when needing to invoke .first() on sorted DataFrame

2018-02-08 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358048#comment-16358048
 ] 

Liang-Chi Hsieh commented on SPARK-2:
-

Currently I think we don't have API in Dataset to just fetch an any row back. 
Is it reasonable to add a \{{def any(n: Int): Array[T]}} to Dataset? cc 
[~cloud_fan]

> SparkML VectorAssembler.transform slow when needing to invoke .first() on 
> sorted DataFrame
> --
>
> Key: SPARK-2
> URL: https://issues.apache.org/jira/browse/SPARK-2
> Project: Spark
>  Issue Type: Improvement
>  Components: ML, MLlib, SQL
>Affects Versions: 2.2.1
>Reporter: V Luong
>Priority: Major
>
> Under certain circumstances, newDF = vectorAssembler.transform(oldDF) invokes 
> oldDF.first() in order to establish some metadata/attributes: 
> [https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala#L88.]
>  When oldDF is sorted, the above triggering of oldDF.first() can be very slow.
> For the purpose of establishing metadata, taking an arbitrary row from oldDF 
> will be just as good as taking oldDF.first(). Is there hence a way we can 
> speed up a great deal by somehow grabbing a random row, instead of relying on 
> oldDF.first()?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2018-02-07 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356525#comment-16356525
 ] 

Liang-Chi Hsieh commented on SPARK-22446:
-

2.0 and 2.1 also have this issue.

> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.2, 2.1.2, 2.2.1
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.3.0
>
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.sca

[jira] [Commented] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2018-02-07 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356506#comment-16356506
 ] 

Liang-Chi Hsieh commented on SPARK-22446:
-

Yes, this is an issue in Spark 2.2. For earlier version, let me check it.

> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.2, 2.1.2, 2.2.1
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.3.0
>
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> org.apache.spark.scheduler.S

[jira] [Created] (SPARK-23284) Document several get API of ColumnVector's behavior when accessing null slot

2018-01-31 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-23284:
---

 Summary: Document several get API of ColumnVector's behavior when 
accessing null slot
 Key: SPARK-23284
 URL: https://issues.apache.org/jira/browse/SPARK-23284
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


We should clearly document the behavior of some ColumnVector get APIs such as 
getBinary, getStruct, getArray, etc., when accessing a null slot. Those APIs 
should return null if the slot is null.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23273) Spark Dataset withColumn - schema column order isn't the same as case class paramether order

2018-01-30 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346148#comment-16346148
 ] 

Liang-Chi Hsieh commented on SPARK-23273:
-

The {{name}} column will be added after {{age}} in {{ds2}}. So the schema of 
{{ds2}} doesn't match {{ds1}} in the order of columns. You can change column 
order with a projection before union:
{code:java}
scala> ds1.union(ds2.select("name", "age").as[NameAge]).show
+-+---+
| name|age|
+-+---+
|henriquedsg89|  1|
+-+---+
{code}
Since 2.3.0, there is an API {{unionByName}} can be used for this kind of cases:
{code:java}
scala> ds1.unionByName(ds2).show
+-+---+
| name|age|
+-+---+
|henriquedsg89|  1|
+-+---+
{code}

> Spark Dataset withColumn - schema column order isn't the same as case class 
> paramether order
> 
>
> Key: SPARK-23273
> URL: https://issues.apache.org/jira/browse/SPARK-23273
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Henrique dos Santos Goulart
>Priority: Major
>
> {code:java}
> case class OnlyAge(age: Int)
> case class NameAge(name: String, age: Int)
> val ds1 = spark.emptyDataset[NameAge]
> val ds2 = spark
>   .createDataset(Seq(OnlyAge(1)))
>   .withColumn("name", lit("henriquedsg89"))
>   .as[NameAge]
> ds1.show()
> ds2.show()
> ds1.union(ds2)
> {code}
>  
> It's going to raise this error:
> {noformat}
> Cannot up cast `age` from string to int as it may truncate
> The type path of the target object is:
> - field (class: "scala.Int", name: "age")
> - root class: "dw.NameAge"{noformat}
> It seems that .as[CaseClass] doesn't keep the order of paramethers that is 
> typed on case class.
> If I change the case class paramether order, it's going to work... like: 
> {code:java}
> case class NameAge(age: Int, name: String){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23224) union all will throw gramma exception

2018-01-26 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh resolved SPARK-23224.
-
Resolution: Not A Problem

> union all will throw gramma exception
> -
>
> Key: SPARK-23224
> URL: https://issues.apache.org/jira/browse/SPARK-23224
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: chenyukang
>Priority: Major
>
> when keyword "limit " in first sub query , this query will fail with gramma 
> exception
> {code:java}
> spark-sql>
>  >
>  > insert overwrite table tmp_wjmdb.tmp_cyk_test1
>  > select * from tmp_wjmdb.abctest limit 10
>  > union all
>  > select * from tmp_wjmdb.abctest limit 20;
> 18/01/26 12:18:58 INFO SparkSqlParser: Parsing command: insert overwrite 
> table tmp_wjmdb.tmp_cyk_test1
> select * from tmp_wjmdb.abctest limit 10
> union all
> select * from tmp_wjmdb.abctest limit 20
> Error in query:
> mismatched input 'union' expecting {, '.', '[', 'OR', 'AND', 'IN', NOT, 
> 'BETWEEN', 'LIKE', RLIKE, 'IS', EQ, '<=>', '<>', '!=', '<', LTE, '>', GTE, 
> '+', '-', '*', '/', '%', 'DIV', '&', '|', '^'}(line 3, pos 0)
> == SQL ==
> insert overwrite table tmp_wjmdb.tmp_cyk_test1
> select * from tmp_wjmdb.abctest limit 10
> union all
> ^^^
> select * from tmp_wjmdb.abctest limit 20
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23224) union all will throw gramma exception

2018-01-26 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340746#comment-16340746
 ] 

Liang-Chi Hsieh commented on SPARK-23224:
-

I'd close this for now. You can reopen it if you find other problem.

> union all will throw gramma exception
> -
>
> Key: SPARK-23224
> URL: https://issues.apache.org/jira/browse/SPARK-23224
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: chenyukang
>Priority: Major
>
> when keyword "limit " in first sub query , this query will fail with gramma 
> exception
> {code:java}
> spark-sql>
>  >
>  > insert overwrite table tmp_wjmdb.tmp_cyk_test1
>  > select * from tmp_wjmdb.abctest limit 10
>  > union all
>  > select * from tmp_wjmdb.abctest limit 20;
> 18/01/26 12:18:58 INFO SparkSqlParser: Parsing command: insert overwrite 
> table tmp_wjmdb.tmp_cyk_test1
> select * from tmp_wjmdb.abctest limit 10
> union all
> select * from tmp_wjmdb.abctest limit 20
> Error in query:
> mismatched input 'union' expecting {, '.', '[', 'OR', 'AND', 'IN', NOT, 
> 'BETWEEN', 'LIKE', RLIKE, 'IS', EQ, '<=>', '<>', '!=', '<', LTE, '>', GTE, 
> '+', '-', '*', '/', '%', 'DIV', '&', '|', '^'}(line 3, pos 0)
> == SQL ==
> insert overwrite table tmp_wjmdb.tmp_cyk_test1
> select * from tmp_wjmdb.abctest limit 10
> union all
> ^^^
> select * from tmp_wjmdb.abctest limit 20
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23224) union all will throw gramma exception

2018-01-26 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340741#comment-16340741
 ] 

Liang-Chi Hsieh commented on SPARK-23224:
-

[https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Union#LanguageManualUnion-ApplyingSubclauses]

The rule is similar as Hive. You can place the clause inside one pair of 
parentheses that enclose the SELECT.
{code:java}

insert overwrite table tmp_wjmdb.tmp_cyk_test1
(select * from tmp_wjmdb.abctest limit 10)
union all
(select * from tmp_wjmdb.abctest limit 20);{code}

> union all will throw gramma exception
> -
>
> Key: SPARK-23224
> URL: https://issues.apache.org/jira/browse/SPARK-23224
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: chenyukang
>Priority: Major
>
> when keyword "limit " in first sub query , this query will fail with gramma 
> exception
> {code:java}
> spark-sql>
>  >
>  > insert overwrite table tmp_wjmdb.tmp_cyk_test1
>  > select * from tmp_wjmdb.abctest limit 10
>  > union all
>  > select * from tmp_wjmdb.abctest limit 20;
> 18/01/26 12:18:58 INFO SparkSqlParser: Parsing command: insert overwrite 
> table tmp_wjmdb.tmp_cyk_test1
> select * from tmp_wjmdb.abctest limit 10
> union all
> select * from tmp_wjmdb.abctest limit 20
> Error in query:
> mismatched input 'union' expecting {, '.', '[', 'OR', 'AND', 'IN', NOT, 
> 'BETWEEN', 'LIKE', RLIKE, 'IS', EQ, '<=>', '<>', '!=', '<', LTE, '>', GTE, 
> '+', '-', '*', '/', '%', 'DIV', '&', '|', '^'}(line 3, pos 0)
> == SQL ==
> insert overwrite table tmp_wjmdb.tmp_cyk_test1
> select * from tmp_wjmdb.abctest limit 10
> union all
> ^^^
> select * from tmp_wjmdb.abctest limit 20
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23220) broadcast hint not applied in a streaming left anti join

2018-01-25 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340495#comment-16340495
 ] 

Liang-Chi Hsieh commented on SPARK-23220:
-

I can't re-produce it locally. I join a stream with a static dataframe 
similarly, but the broadcast hint does work as expected. Can you provide a 
small reproducible example? Thanks.

> broadcast hint not applied in a streaming left anti join
> 
>
> Key: SPARK-23220
> URL: https://issues.apache.org/jira/browse/SPARK-23220
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
>Reporter: Mathieu DESPRIEE
>Priority: Major
> Attachments: Screenshot from 2018-01-25 17-32-45.png
>
>
> We have a structured streaming app doing a left anti-join between a stream, 
> and a static dataframe. This one is quite small (a few 100s of rows), but the 
> query plan by default is a sort merge join.
>   
>  It happens sometimes we need to re-process some historical data, so we feed 
> the same app with a FileSource pointing to our S3 storage with all archives. 
> In that situation, the first mini-batch is quite heavy (several 100'000s of 
> input files), and the time spent in sort-merge join is non-acceptable. 
> Additionally it's highly skewed, so partition sizes are completely uneven, 
> and executors tend to crash with OOMs.
> I tried to switch to a broadcast join, but Spark still applies a sort-merge.
> {noformat}
> ds.join(broadcast(hostnames), Seq("hostname"), "leftanti")
> {noformat}
> !Screenshot from 2018-01-25 17-32-45.png!
> The logical plan is :
> {noformat}
> Project [app_id#5203, <--- snip ---> ... 18 more fields]
> +- Project ...
> <-- snip -->
>  +- Join LeftAnti, (hostname#3584 = hostname#190)
> :- Project [app_id, ...
> <-- snip -->
>+- StreamingExecutionRelation 
> FileStreamSource[s3://{/2018/{01,02}/*/*/,/2017/{08,09,10,11,12}/*/*/}], 
> [app_id
>  <--snip--> ... 62 more fields]
> +- ResolvedHint isBroadcastable=true
>+- Relation[hostname#190,descr#191] 
> RedshiftRelation("PUBLIC"."hostname_filter")
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23173) from_json can produce nulls for fields which are marked as non-nullable

2018-01-22 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334005#comment-16334005
 ] 

Liang-Chi Hsieh commented on SPARK-23173:
-

+1 for 1 too.

> from_json can produce nulls for fields which are marked as non-nullable
> ---
>
> Key: SPARK-23173
> URL: https://issues.apache.org/jira/browse/SPARK-23173
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Herman van Hovell
>Priority: Major
>
> The {{from_json}} function uses a schema to convert a string into a Spark SQL 
> struct. This schema can contain non-nullable fields. The underlying 
> {{JsonToStructs}} expression does not check if a resulting struct respects 
> the nullability of the schema. This leads to very weird problems in consuming 
> expressions. In our case parquet writing would produce an illegal parquet 
> file.
> There are roughly solutions here:
>  # Assume that each field in schema passed to {{from_json}} is nullable, and 
> ignore the nullability information set in the passed schema.
>  # Validate the object during runtime, and fail execution if the data is null 
> where we are not expecting this.
> I currently am slightly in favor of option 1, since this is the more 
> performant option and a lot easier to do.
> WDYT? cc [~rxin] [~marmbrus] [~hyukjin.kwon] [~brkyvz]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22935) Dataset with Java Beans for java.sql.Date throws CompileException

2018-01-17 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16330207#comment-16330207
 ] 

Liang-Chi Hsieh edited comment on SPARK-22935 at 1/18/18 7:53 AM:
--

Can we close this one?


was (Author: viirya):
Can

> Dataset with Java Beans for java.sql.Date throws CompileException
> -
>
> Key: SPARK-22935
> URL: https://issues.apache.org/jira/browse/SPARK-22935
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1, 2.3.0
>Reporter: Kazuaki Ishizaki
>Priority: Major
>
> The following code can throw an exception with or without whole-stage codegen.
> {code}
>   public void SPARK22935() {
> Dataset cdr = spark
> .read()
> .format("csv")
> .option("header", "true")
> .option("inferSchema", "true")
> .option("delimiter", ";")
> .csv("CDR_SAMPLE.csv")
> .as(Encoders.bean(CDR.class));
> Dataset ds = cdr.filter((FilterFunction) x -> (x.timestamp != 
> null));
> long c = ds.count();
> cdr.show(2);
> ds.show(2);
> System.out.println("cnt=" + c);
>   }
> // CDR.java
> public class CDR implements java.io.Serializable {
>   public java.sql.Date timestamp;
>   public java.sql.Date getTimestamp() { return this.timestamp; }
>   public void setTimestamp(java.sql.Date timestamp) { this.timestamp = 
> timestamp; }
> }
> // CDR_SAMPLE.csv
> timestamp
> 2017-10-29T02:37:07.815Z
> 2017-10-29T02:38:07.815Z
> {code}
> result
> {code}
> 12:17:10.352 ERROR 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: failed to 
> compile: org.codehaus.commons.compiler.CompileException: File 
> 'generated.java', Line 61, Column 70: No applicable constructor/method found 
> for actual parameters "long"; candidates are: "public static java.sql.Date 
> org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 61, Column 70: No applicable constructor/method found for actual parameters 
> "long"; candidates are: "public static java.sql.Date 
> org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821)
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22935) Dataset with Java Beans for java.sql.Date throws CompileException

2018-01-17 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16330207#comment-16330207
 ] 

Liang-Chi Hsieh edited comment on SPARK-22935 at 1/18/18 7:52 AM:
--

Can


was (Author: viirya):
Can

> Dataset with Java Beans for java.sql.Date throws CompileException
> -
>
> Key: SPARK-22935
> URL: https://issues.apache.org/jira/browse/SPARK-22935
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1, 2.3.0
>Reporter: Kazuaki Ishizaki
>Priority: Major
>
> The following code can throw an exception with or without whole-stage codegen.
> {code}
>   public void SPARK22935() {
> Dataset cdr = spark
> .read()
> .format("csv")
> .option("header", "true")
> .option("inferSchema", "true")
> .option("delimiter", ";")
> .csv("CDR_SAMPLE.csv")
> .as(Encoders.bean(CDR.class));
> Dataset ds = cdr.filter((FilterFunction) x -> (x.timestamp != 
> null));
> long c = ds.count();
> cdr.show(2);
> ds.show(2);
> System.out.println("cnt=" + c);
>   }
> // CDR.java
> public class CDR implements java.io.Serializable {
>   public java.sql.Date timestamp;
>   public java.sql.Date getTimestamp() { return this.timestamp; }
>   public void setTimestamp(java.sql.Date timestamp) { this.timestamp = 
> timestamp; }
> }
> // CDR_SAMPLE.csv
> timestamp
> 2017-10-29T02:37:07.815Z
> 2017-10-29T02:38:07.815Z
> {code}
> result
> {code}
> 12:17:10.352 ERROR 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: failed to 
> compile: org.codehaus.commons.compiler.CompileException: File 
> 'generated.java', Line 61, Column 70: No applicable constructor/method found 
> for actual parameters "long"; candidates are: "public static java.sql.Date 
> org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 61, Column 70: No applicable constructor/method found for actual parameters 
> "long"; candidates are: "public static java.sql.Date 
> org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821)
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22935) Dataset with Java Beans for java.sql.Date throws CompileException

2018-01-17 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16330207#comment-16330207
 ] 

Liang-Chi Hsieh commented on SPARK-22935:
-

Can

> Dataset with Java Beans for java.sql.Date throws CompileException
> -
>
> Key: SPARK-22935
> URL: https://issues.apache.org/jira/browse/SPARK-22935
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1, 2.3.0
>Reporter: Kazuaki Ishizaki
>Priority: Major
>
> The following code can throw an exception with or without whole-stage codegen.
> {code}
>   public void SPARK22935() {
> Dataset cdr = spark
> .read()
> .format("csv")
> .option("header", "true")
> .option("inferSchema", "true")
> .option("delimiter", ";")
> .csv("CDR_SAMPLE.csv")
> .as(Encoders.bean(CDR.class));
> Dataset ds = cdr.filter((FilterFunction) x -> (x.timestamp != 
> null));
> long c = ds.count();
> cdr.show(2);
> ds.show(2);
> System.out.println("cnt=" + c);
>   }
> // CDR.java
> public class CDR implements java.io.Serializable {
>   public java.sql.Date timestamp;
>   public java.sql.Date getTimestamp() { return this.timestamp; }
>   public void setTimestamp(java.sql.Date timestamp) { this.timestamp = 
> timestamp; }
> }
> // CDR_SAMPLE.csv
> timestamp
> 2017-10-29T02:37:07.815Z
> 2017-10-29T02:38:07.815Z
> {code}
> result
> {code}
> 12:17:10.352 ERROR 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: failed to 
> compile: org.codehaus.commons.compiler.CompileException: File 
> 'generated.java', Line 61, Column 70: No applicable constructor/method found 
> for actual parameters "long"; candidates are: "public static java.sql.Date 
> org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 61, Column 70: No applicable constructor/method found for actual parameters 
> "long"; candidates are: "public static java.sql.Date 
> org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821)
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23021) AnalysisBarrier should not cut off the explain output for Parsed Logical Plan

2018-01-12 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323910#comment-16323910
 ] 

Liang-Chi Hsieh commented on SPARK-23021:
-

To override {{innerChildren}} sounds good to me. [~maropu] Please ping me if 
you submit the PR. Thanks.

> AnalysisBarrier should not cut off the explain output for Parsed Logical Plan
> -
>
> Key: SPARK-23021
> URL: https://issues.apache.org/jira/browse/SPARK-23021
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Kris Mok
>
> In PR#20094 as a follow up to SPARK-20392, there were some fixes to the 
> handling of {{AnalysisBarrier}}, but there seem to be more cases that need to 
> be fixed.
> One such case is that right now the Parsed Logical Plan in explain output 
> would be cutoff by {{AnalysisBarrier}}, e.g.
> {code:none}
> scala> val df1 = spark.range(1).select('id as 'x, 'id + 1 as 
> 'y).repartition(1).select('x === 'y)
> df1: org.apache.spark.sql.DataFrame = [(x = y): boolean]
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [('x = 'y) AS (x = y)#22]
> +- AnalysisBarrier Repartition 1, true
> == Analyzed Logical Plan ==
> (x = y): boolean
> Project [(x#16L = y#17L) AS (x = y)#22]
> +- Repartition 1, true
>+- Project [id#13L AS x#16L, (id#13L + cast(1 as bigint)) AS y#17L]
>   +- Range (0, 1, step=1, splits=Some(8))
> == Optimized Logical Plan ==
> Project [(x#16L = y#17L) AS (x = y)#22]
> +- Repartition 1, true
>+- Project [id#13L AS x#16L, (id#13L + 1) AS y#17L]
>   +- Range (0, 1, step=1, splits=Some(8))
> == Physical Plan ==
> *Project [(x#16L = y#17L) AS (x = y)#22]
> +- Exchange RoundRobinPartitioning(1)
>+- *Project [id#13L AS x#16L, (id#13L + 1) AS y#17L]
>   +- *Range (0, 1, step=1, splits=8)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23042) Use OneHotEncoderModel to encode labels in MultilayerPerceptronClassifier

2018-01-11 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-23042:
---

 Summary: Use OneHotEncoderModel to encode labels in 
MultilayerPerceptronClassifier
 Key: SPARK-23042
 URL: https://issues.apache.org/jira/browse/SPARK-23042
 Project: Spark
  Issue Type: Bug
  Components: ML
Affects Versions: 2.3.0
Reporter: Liang-Chi Hsieh


In MultilayerPerceptronClassifier, we use RDD operation to encode labels for 
now. I think we should use ML's OneHotEncoderEstimator/Model to do the encoding.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22898) collect_set aggregation on bucketed table causes an exchange stage

2018-01-02 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh resolved SPARK-22898.
-
Resolution: Duplicate

> collect_set aggregation on bucketed table causes an exchange stage
> --
>
> Key: SPARK-22898
> URL: https://issues.apache.org/jira/browse/SPARK-22898
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Modi Tamam
>  Labels: bucketing
>
> I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed 
> table, here's the desc formatted my_bucketed_tbl output:
> +++---+
> |col_nam| data_type|comment|
> +++---+
> |  bundle| string|   null|
> | ifa| string|   null|
> |   date_|date|   null|
> |hour| int|   null|
> |||   |
> |# Detailed Table ...||   |
> |Database| default|   |
> |   Table| my_bucketed_tbl|
> |   Owner|zeppelin|   |
> | Created|Thu Dec 21 13:43:...|   |
> | Last Access|Thu Jan 01 00:00:...|   |
> |Type|EXTERNAL|   |
> |Provider| orc|   |
> | Num Buckets|  16|   |
> |  Bucket Columns| [`ifa`]|   |
> |Sort Columns| [`ifa`]|   |
> |Table Properties|[transient_lastDd...|   |
> |Location|hdfs:/user/hive/w...|   |
> |   Serde Library|org.apache.hadoop...|   |
> | InputFormat|org.apache.hadoop...|   |
> |OutputFormat|org.apache.hadoop...|   |
> |  Storage Properties|[serialization.fo...|   |
> +++---+
> When I'm executing an explain of a group by query, I can see that we've 
> spared the exchange phase :
> {code:java}
> sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain
> == Physical Plan ==
> SortAggregate(key=[ifa#932], functions=[max(bundle#920)])
> +- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)])
>+- *Sort [ifa#932 ASC NULLS FIRST], false, 0
>   +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, 
> Format: ORC, Location: 
> InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct
> {code}
> But, when I replace Spark's max function with collect_set, I can see that the 
> execution plan is the same as a non-bucketed table, means, the exchange phase 
> is not spared :
> {code:java}
> sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by 
> ifa").explain
> == Physical Plan ==
> ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, 
> 0)])
> +- Exchange hashpartitioning(ifa#1010, 200)
>+- ObjectHashAggregate(keys=[ifa#1010], 
> functions=[partial_collect_set(bundle#998, 0, 0)])
>   +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, 
> Format: ORC, Location: 
> InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22898) collect_set aggregation on bucketed table causes an exchange stage

2018-01-02 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308970#comment-16308970
 ] 

Liang-Chi Hsieh commented on SPARK-22898:
-

If no problem I will resolve this as duplicate. You can re-open it if you have 
other questions.

> collect_set aggregation on bucketed table causes an exchange stage
> --
>
> Key: SPARK-22898
> URL: https://issues.apache.org/jira/browse/SPARK-22898
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Modi Tamam
>  Labels: bucketing
>
> I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed 
> table, here's the desc formatted my_bucketed_tbl output:
> +++---+
> |col_nam| data_type|comment|
> +++---+
> |  bundle| string|   null|
> | ifa| string|   null|
> |   date_|date|   null|
> |hour| int|   null|
> |||   |
> |# Detailed Table ...||   |
> |Database| default|   |
> |   Table| my_bucketed_tbl|
> |   Owner|zeppelin|   |
> | Created|Thu Dec 21 13:43:...|   |
> | Last Access|Thu Jan 01 00:00:...|   |
> |Type|EXTERNAL|   |
> |Provider| orc|   |
> | Num Buckets|  16|   |
> |  Bucket Columns| [`ifa`]|   |
> |Sort Columns| [`ifa`]|   |
> |Table Properties|[transient_lastDd...|   |
> |Location|hdfs:/user/hive/w...|   |
> |   Serde Library|org.apache.hadoop...|   |
> | InputFormat|org.apache.hadoop...|   |
> |OutputFormat|org.apache.hadoop...|   |
> |  Storage Properties|[serialization.fo...|   |
> +++---+
> When I'm executing an explain of a group by query, I can see that we've 
> spared the exchange phase :
> {code:java}
> sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain
> == Physical Plan ==
> SortAggregate(key=[ifa#932], functions=[max(bundle#920)])
> +- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)])
>+- *Sort [ifa#932 ASC NULLS FIRST], false, 0
>   +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, 
> Format: ORC, Location: 
> InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct
> {code}
> But, when I replace Spark's max function with collect_set, I can see that the 
> execution plan is the same as a non-bucketed table, means, the exchange phase 
> is not spared :
> {code:java}
> sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by 
> ifa").explain
> == Physical Plan ==
> ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, 
> 0)])
> +- Exchange hashpartitioning(ifa#1010, 200)
>+- ObjectHashAggregate(keys=[ifa#1010], 
> functions=[partial_collect_set(bundle#998, 0, 0)])
>   +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, 
> Format: ORC, Location: 
> InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22898) collect_set aggregation on bucketed table causes an exchange stage

2018-01-01 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307591#comment-16307591
 ] 

Liang-Chi Hsieh edited comment on SPARK-22898 at 1/2/18 1:46 AM:
-

I think this should already be fixed by SPARK-3.

I do a test with current master branch:

{code}
val df = {
  (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
}
df.write
  .format("parquet")
  .bucketBy(8, "j")
  .sortBy("j")
  .saveAsTable("bucketed_table")
sql("select j, max(k) from bucketed_table group by j").explain
sql("select j, collect_set(k) from bucketed_table group by j").explain  
{code}

{code}
== Physical Plan ==
SortAggregate(key=[j#4851], functions=[max(k#4852)])
+- SortAggregate(key=[j#4851], functions=[partial_max(k#4852)])
   +- *Sort [j#4851 ASC NULLS FIRST], false, 0
  +- *FileScan parquet default.bucketed_table[j#4851,k#4852] Batched: true, 
Format: Parquet, Location: InMemory
FileIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/bucketed_table], 
PartitionFilters: [], PushedFilters: [
], ReadSchema: struct

== Physical Plan ==
ObjectHashAggregate(keys=[j#4851], functions=[collect_set(k#4852, 0, 0)])
+- ObjectHashAggregate(keys=[j#4851], functions=[partial_collect_set(k#4852, 0, 
0)])
   +- *FileScan parquet default.bucketed_table[j#4851,k#4852] Batched: true, 
Format: Parquet, Location: InMemoryFil
eIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/bucketed_table], 
PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct
{code}




was (Author: viirya):
I think this should already be fixed by SPARK-3.

I do a test with current master branch:

{code}
val df = {
  (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
}
df.write
  .format("parquet")
  .bucketBy(8, "j")
  .sortBy("j")
  .saveAsTable("bucketed_table")
sql("select j, max(k) from bucketed_table group by j").explain
sql("select j, collect_set(k) from bucketed_table group by j").explain  
{code}

{code}
SortAggregate(key=[j#4851], functions=[max(k#4852)])
+- SortAggregate(key=[j#4851], functions=[partial_max(k#4852)])
   +- *Sort [j#4851 ASC NULLS FIRST], false, 0
  +- *FileScan parquet default.bucketed_table[j#4851,k#4852] Batched: true, 
Format: Parquet, Location: InMemory
FileIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/bucketed_table], 
PartitionFilters: [], PushedFilters: [
], ReadSchema: struct
== Physical Plan ==
ObjectHashAggregate(keys=[j#4851], functions=[collect_set(k#4852, 0, 0)])
+- ObjectHashAggregate(keys=[j#4851], functions=[partial_collect_set(k#4852, 0, 
0)])
   +- *FileScan parquet default.bucketed_table[j#4851,k#4852] Batched: true, 
Format: Parquet, Location: InMemoryFil
eIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/bucketed_table], 
PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct
{code}



> collect_set aggregation on bucketed table causes an exchange stage
> --
>
> Key: SPARK-22898
> URL: https://issues.apache.org/jira/browse/SPARK-22898
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Modi Tamam
>  Labels: bucketing
>
> I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed 
> table, here's the desc formatted my_bucketed_tbl output:
> +++---+
> |col_nam| data_type|comment|
> +++---+
> |  bundle| string|   null|
> | ifa| string|   null|
> |   date_|date|   null|
> |hour| int|   null|
> |||   |
> |# Detailed Table ...||   |
> |Database| default|   |
> |   Table| my_bucketed_tbl|
> |   Owner|zeppelin|   |
> | Created|Thu Dec 21 13:43:...|   |
> | Last Access|Thu Jan 01 00:00:...|   |
> |Type|EXTERNAL|   |
> |Provider| orc|   |
> | Num Buckets|  16|   |
> |  Bucket Columns| [`ifa`]|   |
> |Sort Columns| [`ifa`]|   |
> |Table Properties|[transient_lastDd...|   |
> |Location|hdfs:/user/hive/w...|   |
> |   Serde Library|org.apache.hadoop...|   |
> | InputFormat|org.apache.hadoop...|   |
> |OutputFormat|org.apache.hadoop...|

[jira] [Commented] (SPARK-22898) collect_set aggregation on bucketed table causes an exchange stage

2018-01-01 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307591#comment-16307591
 ] 

Liang-Chi Hsieh commented on SPARK-22898:
-

I think this should already be fixed by SPARK-3.

I do a test with current master branch:

{code}
val df = {
  (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
}
df.write
  .format("parquet")
  .bucketBy(8, "j")
  .sortBy("j")
  .saveAsTable("bucketed_table")
sql("select j, max(k) from bucketed_table group by j").explain
sql("select j, collect_set(k) from bucketed_table group by j").explain  
{code}

{code}
SortAggregate(key=[j#4851], functions=[max(k#4852)])
+- SortAggregate(key=[j#4851], functions=[partial_max(k#4852)])
   +- *Sort [j#4851 ASC NULLS FIRST], false, 0
  +- *FileScan parquet default.bucketed_table[j#4851,k#4852] Batched: true, 
Format: Parquet, Location: InMemory
FileIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/bucketed_table], 
PartitionFilters: [], PushedFilters: [
], ReadSchema: struct
== Physical Plan ==
ObjectHashAggregate(keys=[j#4851], functions=[collect_set(k#4852, 0, 0)])
+- ObjectHashAggregate(keys=[j#4851], functions=[partial_collect_set(k#4852, 0, 
0)])
   +- *FileScan parquet default.bucketed_table[j#4851,k#4852] Batched: true, 
Format: Parquet, Location: InMemoryFil
eIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/bucketed_table], 
PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct
{code}



> collect_set aggregation on bucketed table causes an exchange stage
> --
>
> Key: SPARK-22898
> URL: https://issues.apache.org/jira/browse/SPARK-22898
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Modi Tamam
>  Labels: bucketing
>
> I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed 
> table, here's the desc formatted my_bucketed_tbl output:
> +++---+
> |col_nam| data_type|comment|
> +++---+
> |  bundle| string|   null|
> | ifa| string|   null|
> |   date_|date|   null|
> |hour| int|   null|
> |||   |
> |# Detailed Table ...||   |
> |Database| default|   |
> |   Table| my_bucketed_tbl|
> |   Owner|zeppelin|   |
> | Created|Thu Dec 21 13:43:...|   |
> | Last Access|Thu Jan 01 00:00:...|   |
> |Type|EXTERNAL|   |
> |Provider| orc|   |
> | Num Buckets|  16|   |
> |  Bucket Columns| [`ifa`]|   |
> |Sort Columns| [`ifa`]|   |
> |Table Properties|[transient_lastDd...|   |
> |Location|hdfs:/user/hive/w...|   |
> |   Serde Library|org.apache.hadoop...|   |
> | InputFormat|org.apache.hadoop...|   |
> |OutputFormat|org.apache.hadoop...|   |
> |  Storage Properties|[serialization.fo...|   |
> +++---+
> When I'm executing an explain of a group by query, I can see that we've 
> spared the exchange phase :
> {code:java}
> sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain
> == Physical Plan ==
> SortAggregate(key=[ifa#932], functions=[max(bundle#920)])
> +- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)])
>+- *Sort [ifa#932 ASC NULLS FIRST], false, 0
>   +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, 
> Format: ORC, Location: 
> InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct
> {code}
> But, when I replace Spark's max function with collect_set, I can see that the 
> execution plan is the same as a non-bucketed table, means, the exchange phase 
> is not spared :
> {code:java}
> sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by 
> ifa").explain
> == Physical Plan ==
> ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, 
> 0)])
> +- Exchange hashpartitioning(ifa#1010, 200)
>+- ObjectHashAggregate(keys=[ifa#1010], 
> functions=[partial_collect_set(bundle#998, 0, 0)])
>   +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, 
> Format: ORC, Lo

[jira] [Updated] (SPARK-22856) Add wrapper for codegen output and nullability

2017-12-21 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-22856:

Affects Version/s: (was: 2.2.1)
   2.3.0

> Add wrapper for codegen output and nullability
> --
>
> Key: SPARK-22856
> URL: https://issues.apache.org/jira/browse/SPARK-22856
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Liang-Chi Hsieh
>
> The codegen output of {{Expression}}, {{ExprCode}}, now encapsulates only 
> strings of output value ({{value}}) and nullability ({{isNull}}). It makes 
> difficulty for us to know what the output really is. I think it is better if 
> we can add wrappers for the value and nullability that let us to easily know 
> that.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22856) Add wrapper for codegen output and nullability

2017-12-21 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-22856:

Description: The codegen output of {{Expression}}, {{ExprCode}}, now 
encapsulates only strings of output value ({{value}}) and nullability 
({{isNull}}). It makes difficulty for us to know what the output really is. I 
think it is better if we can add wrappers for the value and nullability that 
let us to easily know that.  (was: The codegen output of {{Expression}}, 
{{ExprCode}}, now encapsulates only strings of output value {{value}} and 
nullability ({{isNull}}). It makes difficulty for us to know what the output 
really is. I think it is better if we can add wrappers for the value and 
nullability that let us to easily know that.)

> Add wrapper for codegen output and nullability
> --
>
> Key: SPARK-22856
> URL: https://issues.apache.org/jira/browse/SPARK-22856
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Liang-Chi Hsieh
>
> The codegen output of {{Expression}}, {{ExprCode}}, now encapsulates only 
> strings of output value ({{value}}) and nullability ({{isNull}}). It makes 
> difficulty for us to know what the output really is. I think it is better if 
> we can add wrappers for the value and nullability that let us to easily know 
> that.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22856) Add wrapper for codegen output and nullability

2017-12-20 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-22856:
---

 Summary: Add wrapper for codegen output and nullability
 Key: SPARK-22856
 URL: https://issues.apache.org/jira/browse/SPARK-22856
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.2.1
Reporter: Liang-Chi Hsieh


The codegen output of {{Expression}}, {{ExprCode}}, now encapsulates only 
strings of output value {{value}} and nullability ({{isNull}}). It makes 
difficulty for us to know what the output really is. I think it is better if we 
can add wrappers for the value and nullability that let us to easily know that.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22600) Fix 64kb limit for deeply nested expressions under wholestage codegen

2017-12-13 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290423#comment-16290423
 ] 

Liang-Chi Hsieh edited comment on SPARK-22600 at 12/14/17 6:59 AM:
---

The current approach proposes a new contract that Spark doesn't promise before: 
{{Expression.genCode}} must output something that can be used as parameter name 
or literal. If we output a java expression that can produce a value such as 
{{var1 + 1}}, because it can't be used as parameter name, this approach will 
fail the compilation.

To change the expression with a generated parameter name should be difficult as 
we already use the expression in generated code.

If we accept this new contract, we should document it clearly and check if any 
places use such expression as codegen output.

The current approach is documented in the design doc. Please give me feedbacks 
if you have time to go through it. Thank you.

The design doc is posted at 
https://docs.google.com/document/d/1By_V-A2sxCWbP7dZ5EzHIuMSe8K0fQL9lqovGWXnsfs/edit?usp=sharing




was (Author: viirya):
The current approach proposes a new contract that Spark doesn't promise before: 
{{Expression.genCode}} must output something that can be used as parameter name 
or literal. If we output a java expression that can produce a value such as 
{{var1 + 1}}, because it can't be used as parameter name, this approach will 
fail the compilation.

To change the expression with a generated parameter name should be difficult as 
we already use the expression in generated code.

If we accept this new contract, we should document it clearly and check if any 
places use such expression as codegen output.

The current approach is documented in the design doc.

The design doc is posted at 
https://docs.google.com/document/d/1By_V-A2sxCWbP7dZ5EzHIuMSe8K0fQL9lqovGWXnsfs/edit?usp=sharing



> Fix 64kb limit for deeply nested expressions under wholestage codegen
> -
>
> Key: SPARK-22600
> URL: https://issues.apache.org/jira/browse/SPARK-22600
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Liang-Chi Hsieh
>Assignee: Liang-Chi Hsieh
> Fix For: 2.3.0
>
>
> This is an extension of SPARK-22543 to fix 64kb compile error for deeply 
> nested expressions under wholestage codegen.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22600) Fix 64kb limit for deeply nested expressions under wholestage codegen

2017-12-13 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290423#comment-16290423
 ] 

Liang-Chi Hsieh commented on SPARK-22600:
-

The current approach proposes a new contract that Spark doesn't promise before: 
{{Expression.genCode}} must output something that can be used as parameter name 
or literal. If we output a java expression that can produce a value such as 
{{var1 + 1}}, because it can't be used as parameter name, this approach will 
fail the compilation.

To change the expression with a generated parameter name should be difficult as 
we already use the expression in generated code.

If we accept this new contract, we should document it clearly and check if any 
places use such expression as codegen output.

The current approach is documented in the design doc.

The design doc is posted at 
https://docs.google.com/document/d/1By_V-A2sxCWbP7dZ5EzHIuMSe8K0fQL9lqovGWXnsfs/edit?usp=sharing



> Fix 64kb limit for deeply nested expressions under wholestage codegen
> -
>
> Key: SPARK-22600
> URL: https://issues.apache.org/jira/browse/SPARK-22600
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Liang-Chi Hsieh
>Assignee: Liang-Chi Hsieh
> Fix For: 2.3.0
>
>
> This is an extension of SPARK-22543 to fix 64kb compile error for deeply 
> nested expressions under wholestage codegen.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22772) elt should use splitExpressionsWithCurrentInputs to split expression codes

2017-12-13 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-22772:
---

 Summary: elt should use splitExpressionsWithCurrentInputs to split 
expression codes
 Key: SPARK-22772
 URL: https://issues.apache.org/jira/browse/SPARK-22772
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.1
Reporter: Liang-Chi Hsieh


In SPARK-22550, elt is changed to use {{buildCodeBlocks}} to manually 
expression codes. We should use {{splitExpressionsWithCurrentInputs}} to do 
that because it considers both normal codegen and wholestage codegen.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22715) Reuse array in CreateNamedStruct

2017-12-06 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh resolved SPARK-22715.
-
Resolution: Not A Problem

> Reuse array in CreateNamedStruct
> 
>
> Key: SPARK-22715
> URL: https://issues.apache.org/jira/browse/SPARK-22715
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Marco Gaido
>
> In CreateNamedStruct we are creating the array once for each record, while we 
> can reuse the same across them, reducing GC.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22660) Use position() and limit() to fix ambiguity issue in scala-2.12 and JDK9

2017-11-29 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16272283#comment-16272283
 ] 

Liang-Chi Hsieh commented on SPARK-22660:
-

For the error you ping me, from the error message, looks like you can try to 
add {{import scala.language.reflectiveCalls}}?

Btw, are we supporting JDK9?

> Use position() and limit() to fix ambiguity issue in scala-2.12 and JDK9
> 
>
> Key: SPARK-22660
> URL: https://issues.apache.org/jira/browse/SPARK-22660
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 2.2.0
>Reporter: liyunzhang
>
> Based on SPARK-22659
> 1. compile with -Pscala-2.12 and get the error
> {code}
> Use position() and limit() to fix ambiguity issue
> {code}
> spark_source/core/src/main/scala/org/apache/spark/executor/Executor.scala:455:
>  ambiguous reference to overloaded definition, method limit in class 
> ByteBuffer of type (x$1: Int)java.nio.ByteBuffer
> method limit in class Buffer of type ()Int
> match expected type ?
>  val resultSize = serializedDirectResult.limit
> error 
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22600) Fix 64kb limit for deeply nested expressions under wholestage codegen

2017-11-24 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-22600:
---

 Summary: Fix 64kb limit for deeply nested expressions under 
wholestage codegen
 Key: SPARK-22600
 URL: https://issues.apache.org/jira/browse/SPARK-22600
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
Reporter: Liang-Chi Hsieh


This is an extension of SPARK-22543 to fix 64kb compile error for deeply nested 
expressions under wholestage codegen.





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22591) GenerateOrdering shouldn't change ctx.INPUT_ROW

2017-11-23 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-22591:
---

 Summary: GenerateOrdering shouldn't change ctx.INPUT_ROW
 Key: SPARK-22591
 URL: https://issues.apache.org/jira/browse/SPARK-22591
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
Reporter: Liang-Chi Hsieh


{{GenerateOrdering}} changes {{ctx.INPUT_ROW}} but doesn't restore the original 
value. Since {{ctx.INPUT_ROW}} is used when generating codes, it is risky to 
change it arbitrarily.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22551) Fix 64kb compile error for common expression types

2017-11-22 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh resolved SPARK-22551.
-
Resolution: Not A Problem

> Fix 64kb compile error for common expression types
> --
>
> Key: SPARK-22551
> URL: https://issues.apache.org/jira/browse/SPARK-22551
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Liang-Chi Hsieh
>
> For common expression types, such as {{BinaryExpression}} and 
> {{TernaryExpression}}, the combination of generated codes of children can 
> possibly be large. We should put the codes into functions to prevent possible 
> 64kb compile error.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22551) Fix 64kb compile error for common expression types

2017-11-22 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16263696#comment-16263696
 ] 

Liang-Chi Hsieh commented on SPARK-22551:
-

After SPARK-22543 is merged, I can't reproduce this issue. Hopefully it also 
solves this, I close this as not a problem for now.

> Fix 64kb compile error for common expression types
> --
>
> Key: SPARK-22551
> URL: https://issues.apache.org/jira/browse/SPARK-22551
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Liang-Chi Hsieh
>
> For common expression types, such as {{BinaryExpression}} and 
> {{TernaryExpression}}, the combination of generated codes of children can 
> possibly be large. We should put the codes into functions to prevent possible 
> 64kb compile error.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-20 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-22541:

Component/s: (was: Documentation)

> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Documentation
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-20 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260240#comment-16260240
 ] 

Liang-Chi Hsieh commented on SPARK-22541:
-

Since this is known behavior, I will change this from bug to documentation.

> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-20 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-22541:

Issue Type: Documentation  (was: Bug)

> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Documentation
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-20 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-22541:

Component/s: Documentation

> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation, PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-19 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16258873#comment-16258873
 ] 

Liang-Chi Hsieh edited comment on SPARK-22541 at 11/20/17 7:14 AM:
---

This is similar to the case of using python udfs with conditional expressions, 
I think we should also explicitly note this behavior in the document.


was (Author: viirya):
Similar to the case of using python udfs with conditional expressions, I think 
we don't have easy fix for this. 

Btw, I think we should also explicitly note this behavior in the document.

> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-19 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16258873#comment-16258873
 ] 

Liang-Chi Hsieh commented on SPARK-22541:
-

Similar to the case of using python udfs with conditional expressions, I think 
we don't have easy fix for this. 

Btw, I think we should also explicitly note this behavior in the document.

> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-19 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16258868#comment-16258868
 ] 

Liang-Chi Hsieh edited comment on SPARK-22541 at 11/20/17 7:01 AM:
---

Sorry, my previous reply is not completely correct.

This behavior is also related to how PySpark runs python udfs. We can try to 
show the query plan of the {{df}}:

{code}
...
== Physical Plan ==
*Project [key#0, val1#1L, val2#2L]
+- *Filter (pythonUDF0#21 && pythonUDF1#22)
   +- BatchEvalPython [myfilter1(val1#1L), myfilter2(val2#2L)], [key#0, 
val1#1L, val2#2L, pythonUDF0#21, pythonUDF1#22]
  +- Scan ExistingRDD[key#0,val1#1L,val2#2L]
{code}

The python udfs are pushed down to a special physical operator 
{{BatchEvalPython}} to execute. Due to the implementation details, the pushed 
down python udfs are not conditional. That's said they are evaluated on all 
rows, even logically in the original query they are only evaluated on part of 
rows by using some conditional expressions such as when or if. The issue you 
found here is also the same reason.






was (Author: viirya):
Sorry, my previous reply is not completely correct.

This behavior is related to how PySpark runs python udfs. We can try to show 
the query plan of the {{df}}:

{code}
...
== Physical Plan ==
*Project [key#0, val1#1L, val2#2L]
+- *Filter (pythonUDF0#21 && pythonUDF1#22)
   +- BatchEvalPython [myfilter1(val1#1L), myfilter2(val2#2L)], [key#0, 
val1#1L, val2#2L, pythonUDF0#21, pythonUDF1#22]
  +- Scan ExistingRDD[key#0,val1#1L,val2#2L]
{code}

The python udfs are pushed down to a special physical operator 
{{BatchEvalPython}} to execute. Due to the implementation details, the pushed 
down python udfs are not conditional. That's said they are evaluated on all 
rows, even logically in the original query they are only evaluated on part of 
rows by using some conditional expressions such as when or if. The issue you 
found here is also the same reason.





> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-19 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16258868#comment-16258868
 ] 

Liang-Chi Hsieh commented on SPARK-22541:
-

Sorry, my previous reply is not completely correct.

This behavior is related to how PySpark runs python udfs. We can try to show 
the query plan of the {{df}}:

{code}
...
== Physical Plan ==
*Project [key#0, val1#1L, val2#2L]
+- *Filter (pythonUDF0#21 && pythonUDF1#22)
   +- BatchEvalPython [myfilter1(val1#1L), myfilter2(val2#2L)], [key#0, 
val1#1L, val2#2L, pythonUDF0#21, pythonUDF1#22]
  +- Scan ExistingRDD[key#0,val1#1L,val2#2L]
{code}

The python udfs are pushed down to a special physical operator 
{{BatchEvalPython}} to execute. Due to the implementation details, the pushed 
down python udfs are not conditional. That's said they are evaluated on all 
rows, even logically in the original query they are only evaluated on part of 
rows by using some conditional expressions such as when or if. The issue you 
found here is also the same reason.





> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22551) Fix 64kb compile error for common expression types

2017-11-18 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-22551:

Issue Type: Sub-task  (was: Bug)
Parent: SPARK-22510

> Fix 64kb compile error for common expression types
> --
>
> Key: SPARK-22551
> URL: https://issues.apache.org/jira/browse/SPARK-22551
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Liang-Chi Hsieh
>
> For common expression types, such as {{BinaryExpression}} and 
> {{TernaryExpression}}, the combination of generated codes of children can 
> possibly be large. We should put the codes into functions to prevent possible 
> 64kb compile error.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22551) Fix 64kb compile error for common expression types

2017-11-18 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-22551:
---

 Summary: Fix 64kb compile error for common expression types
 Key: SPARK-22551
 URL: https://issues.apache.org/jira/browse/SPARK-22551
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
Reporter: Liang-Chi Hsieh


For common expression types, such as {{BinaryExpression}} and 
{{TernaryExpression}}, the combination of generated codes of children can 
possibly be large. We should put the codes into functions to prevent possible 
64kb compile error.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20295) when spark.sql.adaptive.enabled is enabled, have conflict with Exchange Resue

2017-11-16 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16256570#comment-16256570
 ] 

Liang-Chi Hsieh commented on SPARK-20295:
-

Btw, from the partial query plan you posted, looks like the coordinator 1, 2, 3 
all only have just one exchange (numExchanges = 1). So I'm not sure if you post 
the correct query plan or not.

> when  spark.sql.adaptive.enabled is enabled, have conflict with Exchange Resue
> --
>
> Key: SPARK-20295
> URL: https://issues.apache.org/jira/browse/SPARK-20295
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, SQL
>Affects Versions: 2.1.0
>Reporter: Ruhui Wang
>
> when run  tpcds-q95, and set  spark.sql.adaptive.enabled = true the physical 
> plan firstly:
> Sort
> :  +- Exchange(coordinator id: 1)
> : +- Project***
> ::-Sort **
> ::  +- Exchange(coordinator id: 2)
> :: :- Project ***
> :+- Sort
> ::  +- Exchange(coordinator id: 3)
>  spark.sql.exchange.reuse is opened, then physical plan will become below:
> Sort
> :  +- Exchange(coordinator id: 1)
> : +- Project***
> ::-Sort **
> ::  +- Exchange(coordinator id: 2)
> :: :- Project ***
> :+- Sort
> ::  +- ReusedExchange  Exchange(coordinator id: 2)
> If spark.sql.adaptive.enabled = true,  the code stack is : 
> ShuffleExchange#doExecute --> postShuffleRDD function --> 
> doEstimationIfNecessary . In this function, 
> assert(exchanges.length == numExchanges) will be error, as left side has only 
> one element, but right is equal to 2.
> If this is a bug of spark.sql.adaptive.enabled and exchange resue?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20295) when spark.sql.adaptive.enabled is enabled, have conflict with Exchange Resue

2017-11-16 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16256546#comment-16256546
 ] 

Liang-Chi Hsieh commented on SPARK-20295:
-

Is this bug also in 2.2? When adaptive execution is enabled, I think we don't 
get any exchange reuse in 2.2.

> when  spark.sql.adaptive.enabled is enabled, have conflict with Exchange Resue
> --
>
> Key: SPARK-20295
> URL: https://issues.apache.org/jira/browse/SPARK-20295
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, SQL
>Affects Versions: 2.1.0
>Reporter: Ruhui Wang
>
> when run  tpcds-q95, and set  spark.sql.adaptive.enabled = true the physical 
> plan firstly:
> Sort
> :  +- Exchange(coordinator id: 1)
> : +- Project***
> ::-Sort **
> ::  +- Exchange(coordinator id: 2)
> :: :- Project ***
> :+- Sort
> ::  +- Exchange(coordinator id: 3)
>  spark.sql.exchange.reuse is opened, then physical plan will become below:
> Sort
> :  +- Exchange(coordinator id: 1)
> : +- Project***
> ::-Sort **
> ::  +- Exchange(coordinator id: 2)
> :: :- Project ***
> :+- Sort
> ::  +- ReusedExchange  Exchange(coordinator id: 2)
> If spark.sql.adaptive.enabled = true,  the code stack is : 
> ShuffleExchange#doExecute --> postShuffleRDD function --> 
> doEstimationIfNecessary . In this function, 
> assert(exchanges.length == numExchanges) will be error, as left side has only 
> one element, but right is equal to 2.
> If this is a bug of spark.sql.adaptive.enabled and exchange resue?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

2017-11-16 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16256441#comment-16256441
 ] 

Liang-Chi Hsieh commented on SPARK-22541:
-

Due to query optimization, two filters are combined and actually only one 
filter operation is performed. The filtered results are correct. So I don't 
think this is a bug.

> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> --
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
>Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+++
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+++
> # |key|val1|val2|
> # +---+++
> # |  a|   1|   1|
> # +---+++
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22491) union all can't execute parallel with group by

2017-11-15 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16254431#comment-16254431
 ] 

Liang-Chi Hsieh commented on SPARK-22491:
-

For the query without aggregation, the exchanges are reused and there is only 
one exchange. For the query with aggregation and adaptive execution is enabled, 
the coordinated exchanges are not reused for now and it results three 
exchanges. For the similar case, the coordinated exchanges can be reused. I've 
opened a JIRA/PR SPARK-22527 for this issue.



> union all can't execute parallel with group by 
> ---
>
> Key: SPARK-22491
> URL: https://issues.apache.org/jira/browse/SPARK-22491
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.2
>Reporter: cen yuhai
>Priority: Minor
> Attachments: screenshot-1.png
>
>
> test sql
> set spark.sql.adaptive.enabled=true;
> {code}
> create table temp.test_yuhai_group 
> as 
> select 
>city_name
>,sum(amount)
>  from  temp.table_01 
>  group by city_name
>  union all
>  select 
>city_name
>,sum(amount)
>  from  temp.table_01  
>  group by city_name
>  union all
>  select city_name
>,sum(amount)
>  from  temp.table_01
>  group by city_name
> {code}
> if I remove group by ,it is ok
> {code}
> create table temp.test_yuhai_group 
> as 
> select 
>city_name
>  from  temp.table_01 
>  union all
>  select 
>city_name
>  from  temp.table_01  
>  union all
>  select city_name
>  from  temp.table_01
> {code}
> In the snapshot, the first time I execute ths sql, it run 3 job one by one 
> (200 tasks per job).
> If I remove group by, it run just one job (600 tasks per job).
> workaround:
> set spark.sql.adaptive.enabled=false;



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22527) Reuse coordinated ShuffleExchange if possible

2017-11-15 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-22527:
---

 Summary: Reuse coordinated ShuffleExchange if possible
 Key: SPARK-22527
 URL: https://issues.apache.org/jira/browse/SPARK-22527
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.2.0
Reporter: Liang-Chi Hsieh


ShuffleExchange without coordinator can be reused, but coordinated 
ShuffleExchange can't currently.

We should be able to reuse coordinated ShuffleExchange.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22491) union all can't execute parallel with group by

2017-11-14 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16251194#comment-16251194
 ] 

Liang-Chi Hsieh commented on SPARK-22491:
-

If the aggregation is removed, there is no shuffle exchange actually. So the 
exchange coordinator won't intervene in even the config is enabled.

> union all can't execute parallel with group by 
> ---
>
> Key: SPARK-22491
> URL: https://issues.apache.org/jira/browse/SPARK-22491
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.2
>Reporter: cen yuhai
>Priority: Minor
> Attachments: screenshot-1.png
>
>
> test sql
> set spark.sql.adaptive.enabled=true;
> {code}
> create table temp.test_yuhai_group 
> as 
> select 
>city_name
>,sum(amount)
>  from  temp.table_01 
>  group by city_name
>  union all
>  select 
>city_name
>,sum(amount)
>  from  temp.table_01  
>  group by city_name
>  union all
>  select city_name
>,sum(amount)
>  from  temp.table_01
>  group by city_name
> {code}
> if I remove group by ,it is ok
> {code}
> create table temp.test_yuhai_group 
> as 
> select 
>city_name
>  from  temp.table_01 
>  union all
>  select 
>city_name
>  from  temp.table_01  
>  union all
>  select city_name
>  from  temp.table_01
> {code}
> In the snapshot, the first time I execute ths sql, it run 3 job one by one 
> (200 tasks per job).
> If I remove group by, it run just one job (600 tasks per job).
> workaround:
> set spark.sql.adaptive.enabled=false;



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22442) Schema generated by Product Encoder doesn't match case class field name when using non-standard characters

2017-11-12 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-22442:

Fix Version/s: 2.2.1

> Schema generated by Product Encoder doesn't match case class field name when 
> using non-standard characters
> --
>
> Key: SPARK-22442
> URL: https://issues.apache.org/jira/browse/SPARK-22442
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.2, 2.2.0
>Reporter: Mikel San Vicente
>Assignee: Liang-Chi Hsieh
> Fix For: 2.2.1, 2.3.0
>
>
> Product encoder encodes special characters wrongly when field name contains 
> certain nonstandard characters.
> For example for:
> {quote}
> case class MyType(`field.1`: String, `field 2`: String)
> {quote}
> we will get the following schema
> {quote}
> root
>  |-- field$u002E1: string (nullable = true)
>  |-- field$u00202: string (nullable = true)
> {quote}
> As a consequence of this issue a DataFrame with the correct schema can't be 
> converted to a Dataset using .as[MyType]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22442) Schema generated by Product Encoder doesn't match case class field name when using non-standard characters

2017-11-12 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16249086#comment-16249086
 ] 

Liang-Chi Hsieh commented on SPARK-22442:
-

cc [~felixcheung] This will be backported to 2.2.

> Schema generated by Product Encoder doesn't match case class field name when 
> using non-standard characters
> --
>
> Key: SPARK-22442
> URL: https://issues.apache.org/jira/browse/SPARK-22442
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.2, 2.2.0
>Reporter: Mikel San Vicente
>Assignee: Liang-Chi Hsieh
> Fix For: 2.2.1, 2.3.0
>
>
> Product encoder encodes special characters wrongly when field name contains 
> certain nonstandard characters.
> For example for:
> {quote}
> case class MyType(`field.1`: String, `field 2`: String)
> {quote}
> we will get the following schema
> {quote}
> root
>  |-- field$u002E1: string (nullable = true)
>  |-- field$u00202: string (nullable = true)
> {quote}
> As a consequence of this issue a DataFrame with the correct schema can't be 
> converted to a Dataset using .as[MyType]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22442) Schema generated by Product Encoder doesn't match case class field name when using non-standard characters

2017-11-12 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16249043#comment-16249043
 ] 

Liang-Chi Hsieh commented on SPARK-22442:
-

I think it is easy. Let me prepare a backport PR for it.

> Schema generated by Product Encoder doesn't match case class field name when 
> using non-standard characters
> --
>
> Key: SPARK-22442
> URL: https://issues.apache.org/jira/browse/SPARK-22442
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.2, 2.2.0
>Reporter: Mikel San Vicente
>Assignee: Liang-Chi Hsieh
> Fix For: 2.3.0
>
>
> Product encoder encodes special characters wrongly when field name contains 
> certain nonstandard characters.
> For example for:
> {quote}
> case class MyType(`field.1`: String, `field 2`: String)
> {quote}
> we will get the following schema
> {quote}
> root
>  |-- field$u002E1: string (nullable = true)
>  |-- field$u00202: string (nullable = true)
> {quote}
> As a consequence of this issue a DataFrame with the correct schema can't be 
> converted to a Dataset using .as[MyType]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22460) Spark De-serialization of Timestamp field is Incorrect

2017-11-09 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16245344#comment-16245344
 ] 

Liang-Chi Hsieh commented on SPARK-22460:
-

FYI., there is already a reported issue in spark-avro for this: 
https://github.com/databricks/spark-avro/issues/229.


> Spark De-serialization of Timestamp field is Incorrect
> --
>
> Key: SPARK-22460
> URL: https://issues.apache.org/jira/browse/SPARK-22460
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.1.1
>Reporter: Saniya Tech
>
> We are trying to serialize Timestamp fields to Avro using spark-avro 
> connector. I can see the Timestamp fields are getting correctly serialized as 
> long (milliseconds since Epoch). I verified that the data is correctly read 
> back from the Avro files. It is when we encode the Dataset as a case class 
> that timestamp field is incorrectly converted to a long value as seconds 
> since Epoch. As can be seen below, this shifts the timestamp many years in 
> the future.
> Code used to reproduce the issue:
> {code:java}
> import java.sql.Timestamp
> import com.databricks.spark.avro._
> import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
> case class TestRecord(name: String, modified: Timestamp)
> import spark.implicits._
> val data = Seq(
>   TestRecord("One", new Timestamp(System.currentTimeMillis()))
> )
> // Serialize:
> val parameters = Map("recordName" -> "TestRecord", "recordNamespace" -> 
> "com.example.domain")
> val path = s"s3a://some-bucket/output/"
> val ds = spark.createDataset(data)
> ds.write
>   .options(parameters)
>   .mode(SaveMode.Overwrite)
>   .avro(path)
> //
> // De-serialize
> val output = spark.read.avro(path).as[TestRecord]
> {code}
> Output from the test:
> {code:java}
> scala> data.head
> res4: TestRecord = TestRecord(One,2017-11-06 20:06:19.419)
> scala> output.collect().head
> res5: TestRecord = TestRecord(One,49819-12-16 17:23:39.0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22460) Spark De-serialization of Timestamp field is Incorrect

2017-11-08 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16245158#comment-16245158
 ] 

Liang-Chi Hsieh edited comment on SPARK-22460 at 11/9/17 3:33 AM:
--

>From the output of:
{code}
print(s"${rawOutput.collect().head}\n")
{code}

The modified field is already parsed as timestamp type.


was (Author: viirya):
>From the output of print(s"${rawOutput.collect().head}\n"), the modified field 
>is already parsed as timestamp type.

> Spark De-serialization of Timestamp field is Incorrect
> --
>
> Key: SPARK-22460
> URL: https://issues.apache.org/jira/browse/SPARK-22460
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.1.1
>Reporter: Saniya Tech
>
> We are trying to serialize Timestamp fields to Avro using spark-avro 
> connector. I can see the Timestamp fields are getting correctly serialized as 
> long (milliseconds since Epoch). I verified that the data is correctly read 
> back from the Avro files. It is when we encode the Dataset as a case class 
> that timestamp field is incorrectly converted to a long value as seconds 
> since Epoch. As can be seen below, this shifts the timestamp many years in 
> the future.
> Code used to reproduce the issue:
> {code:java}
> import java.sql.Timestamp
> import com.databricks.spark.avro._
> import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
> case class TestRecord(name: String, modified: Timestamp)
> import spark.implicits._
> val data = Seq(
>   TestRecord("One", new Timestamp(System.currentTimeMillis()))
> )
> // Serialize:
> val parameters = Map("recordName" -> "TestRecord", "recordNamespace" -> 
> "com.example.domain")
> val path = s"s3a://some-bucket/output/"
> val ds = spark.createDataset(data)
> ds.write
>   .options(parameters)
>   .mode(SaveMode.Overwrite)
>   .avro(path)
> //
> // De-serialize
> val output = spark.read.avro(path).as[TestRecord]
> {code}
> Output from the test:
> {code:java}
> scala> data.head
> res4: TestRecord = TestRecord(One,2017-11-06 20:06:19.419)
> scala> output.collect().head
> res5: TestRecord = TestRecord(One,49819-12-16 17:23:39.0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22460) Spark De-serialization of Timestamp field is Incorrect

2017-11-08 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16245158#comment-16245158
 ] 

Liang-Chi Hsieh edited comment on SPARK-22460 at 11/9/17 3:32 AM:
--

>From the output of print(s"${rawOutput.collect().head}\n"), the modified field 
>is already parsed as timestamp type.


was (Author: viirya):
>From the output of {{print(s"${rawOutput.collect().head}\n")}}, the modified 
>field is already parsed as timestamp type.

> Spark De-serialization of Timestamp field is Incorrect
> --
>
> Key: SPARK-22460
> URL: https://issues.apache.org/jira/browse/SPARK-22460
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.1.1
>Reporter: Saniya Tech
>
> We are trying to serialize Timestamp fields to Avro using spark-avro 
> connector. I can see the Timestamp fields are getting correctly serialized as 
> long (milliseconds since Epoch). I verified that the data is correctly read 
> back from the Avro files. It is when we encode the Dataset as a case class 
> that timestamp field is incorrectly converted to a long value as seconds 
> since Epoch. As can be seen below, this shifts the timestamp many years in 
> the future.
> Code used to reproduce the issue:
> {code:java}
> import java.sql.Timestamp
> import com.databricks.spark.avro._
> import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
> case class TestRecord(name: String, modified: Timestamp)
> import spark.implicits._
> val data = Seq(
>   TestRecord("One", new Timestamp(System.currentTimeMillis()))
> )
> // Serialize:
> val parameters = Map("recordName" -> "TestRecord", "recordNamespace" -> 
> "com.example.domain")
> val path = s"s3a://some-bucket/output/"
> val ds = spark.createDataset(data)
> ds.write
>   .options(parameters)
>   .mode(SaveMode.Overwrite)
>   .avro(path)
> //
> // De-serialize
> val output = spark.read.avro(path).as[TestRecord]
> {code}
> Output from the test:
> {code:java}
> scala> data.head
> res4: TestRecord = TestRecord(One,2017-11-06 20:06:19.419)
> scala> output.collect().head
> res5: TestRecord = TestRecord(One,49819-12-16 17:23:39.0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22460) Spark De-serialization of Timestamp field is Incorrect

2017-11-08 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16245158#comment-16245158
 ] 

Liang-Chi Hsieh commented on SPARK-22460:
-

>From the output of {{print(s"${rawOutput.collect().head}\n")}}, the modified 
>field is already parsed as timestamp type.

> Spark De-serialization of Timestamp field is Incorrect
> --
>
> Key: SPARK-22460
> URL: https://issues.apache.org/jira/browse/SPARK-22460
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.1.1
>Reporter: Saniya Tech
>
> We are trying to serialize Timestamp fields to Avro using spark-avro 
> connector. I can see the Timestamp fields are getting correctly serialized as 
> long (milliseconds since Epoch). I verified that the data is correctly read 
> back from the Avro files. It is when we encode the Dataset as a case class 
> that timestamp field is incorrectly converted to a long value as seconds 
> since Epoch. As can be seen below, this shifts the timestamp many years in 
> the future.
> Code used to reproduce the issue:
> {code:java}
> import java.sql.Timestamp
> import com.databricks.spark.avro._
> import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
> case class TestRecord(name: String, modified: Timestamp)
> import spark.implicits._
> val data = Seq(
>   TestRecord("One", new Timestamp(System.currentTimeMillis()))
> )
> // Serialize:
> val parameters = Map("recordName" -> "TestRecord", "recordNamespace" -> 
> "com.example.domain")
> val path = s"s3a://some-bucket/output/"
> val ds = spark.createDataset(data)
> ds.write
>   .options(parameters)
>   .mode(SaveMode.Overwrite)
>   .avro(path)
> //
> // De-serialize
> val output = spark.read.avro(path).as[TestRecord]
> {code}
> Output from the test:
> {code:java}
> scala> data.head
> res4: TestRecord = TestRecord(One,2017-11-06 20:06:19.419)
> scala> output.collect().head
> res5: TestRecord = TestRecord(One,49819-12-16 17:23:39.0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22460) Spark De-serialization of Timestamp field is Incorrect

2017-11-08 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16243834#comment-16243834
 ] 

Liang-Chi Hsieh commented on SPARK-22460:
-

In Spark SQL, to cast a long field to timestamp field, the long value in the 
long field is seen as seconds. Although timestamp field is also stored 
internally as long value, the long value in a timestamp field is seen as 
microseconds. That's said you can't directly cast a long field with 
microseconds/milliseconds to a timestamp field and get correct timestamp...

Currently spark-avro doesn't support conversion from avro's long type to 
Catalyst's Date/Timestamp types, with explicitly given schema, e.g., 
{{spark.read.schema(dataSchema).avro(path)}}. To have correct Date/Timestamp 
field, I think it should be fixed at spark-avro by adding support 
Date/Timestamp data type. It enables spark-avro to interpret loaded data as 
correct date/timestamp fields.


> Spark De-serialization of Timestamp field is Incorrect
> --
>
> Key: SPARK-22460
> URL: https://issues.apache.org/jira/browse/SPARK-22460
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.1.1
>Reporter: Saniya Tech
>
> We are trying to serialize Timestamp fields to Avro using spark-avro 
> connector. I can see the Timestamp fields are getting correctly serialized as 
> long (milliseconds since Epoch). I verified that the data is correctly read 
> back from the Avro files. It is when we encode the Dataset as a case class 
> that timestamp field is incorrectly converted to a long value as seconds 
> since Epoch. As can be seen below, this shifts the timestamp many years in 
> the future.
> Code used to reproduce the issue:
> {code:java}
> import java.sql.Timestamp
> import com.databricks.spark.avro._
> import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
> case class TestRecord(name: String, modified: Timestamp)
> import spark.implicits._
> val data = Seq(
>   TestRecord("One", new Timestamp(System.currentTimeMillis()))
> )
> // Serialize:
> val parameters = Map("recordName" -> "TestRecord", "recordNamespace" -> 
> "com.example.domain")
> val path = s"s3a://some-bucket/output/"
> val ds = spark.createDataset(data)
> ds.write
>   .options(parameters)
>   .mode(SaveMode.Overwrite)
>   .avro(path)
> //
> // De-serialize
> val output = spark.read.avro(path).as[TestRecord]
> {code}
> Output from the test:
> {code:java}
> scala> data.head
> res4: TestRecord = TestRecord(One,2017-11-06 20:06:19.419)
> scala> output.collect().head
> res5: TestRecord = TestRecord(One,49819-12-16 17:23:39.0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2017-11-05 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16239897#comment-16239897
 ] 

Liang-Chi Hsieh commented on SPARK-22446:
-

For this special case, the simplest workaround is to set {{handleInvalid}} as 
keep. Actually the another predicate {{isnotnull(_3#5)}} can filter the row out 
if the UDF doesn't cause error with {{handleInvalid}} as keep.

The problem is happened at the optimizer when pushing predicates down through 
projection. For the catalyst expressions, applying on the supposedly filtered 
out data is not a problem because other predicates should filter it out.

UDFs are special case because they can possibly cause runtime exception when 
applying on unexcepted data. It is not always safe to push down such predicates.

However, because not all UDFs are not safe to push down, we may not want to 
disable all pushdown UDF predicates. Currently I think we should let such UDFs 
as non-deterministic and disable pushdown for it.



> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.0, 2.2.0
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$Ge

[jira] [Commented] (SPARK-22427) StackOverFlowError when using FPGrowth

2017-11-05 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16239824#comment-16239824
 ] 

Liang-Chi Hsieh commented on SPARK-22427:
-

>From a rough glance, looks like the error didn't be thrown inside ml.FPGrowth?

{code}
...
at org.apache.spark.sql.Dataset.count(Dataset.scala:2429)
at DataMining.FPGrowth$.runJob(FPGrowth.scala:116)
at DataMining.testFPG$.main(FPGrowth.scala:36)
at DataMining.testFPG.main(FPGrowth.scala)
...
{code}

> StackOverFlowError when using FPGrowth
> --
>
> Key: SPARK-22427
> URL: https://issues.apache.org/jira/browse/SPARK-22427
> Project: Spark
>  Issue Type: Bug
>  Components: ML, MLlib
>Affects Versions: 2.2.0
> Environment: Centos Linux 3.10.0-327.el7.x86_64
> java 1.8.0.111
> spark 2.2.0
>Reporter: lyt
>
> code part:
> val path = jobConfig.getString("hdfspath")
> val vectordata = sc.sparkContext.textFile(path)
> val finaldata = sc.createDataset(vectordata.map(obj => {
>   obj.split(" ")
> }).filter(arr => arr.length > 0)).toDF("items")
> val fpg = new FPGrowth()
> 
> fpg.setMinSupport(minSupport).setItemsCol("items").setMinConfidence(minConfidence)
> val train = fpg.fit(finaldata)
> print(train.freqItemsets.count())
> print(train.associationRules.count())
> train.save("/tmp/FPGModel")
> And encountered following exception:
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
>   at scala.Option.foreach(Option.scala:257)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
>   at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
>   at 
> org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2430)
>   at 
> org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2429)
>   at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
>   at org.apache.spark.sql.Dataset.count(Dataset.scala:2429)
>   at DataMining.FPGrowth$.runJob(FPGrowth.scala:116)
>   at DataMining.testFPG$.main(FPGrowth.scala:36)
>   at DataMining.testFPG.main(FPGrowth.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
>   at 
> org.apache.spark.

[jira] [Commented] (SPARK-22442) Schema generated by Product Encoder doesn't match case class field name when using non-standard characters

2017-11-05 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16239815#comment-16239815
 ] 

Liang-Chi Hsieh commented on SPARK-22442:
-

I tried on latest master branch. It can work with {{as[MyType]}}.

{code}
scala> val df = Seq(MyType("a", "b"), MyType("c", "d")).toDF
df: org.apache.spark.sql.DataFrame = [field$u002E1: string, field$u00202: 
string]
scala> df.as[MyType].collect
res7: Array[MyType] = Array(MyType(a,b), MyType(c,d))
{code}



> Schema generated by Product Encoder doesn't match case class field name when 
> using non-standard characters
> --
>
> Key: SPARK-22442
> URL: https://issues.apache.org/jira/browse/SPARK-22442
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.2, 2.2.0
>Reporter: Mikel San Vicente
>
> Product encoder encodes special characters wrongly when field name contains 
> certain nonstandard characters.
> For example for:
> {quote}
> case class MyType(`field.1`: String, `field 2`: String)
> {quote}
> we will get the following schema
> {quote}
> root
>  |-- field$u002E1: string (nullable = true)
>  |-- field$u00202: string (nullable = true)
> {quote}
> As a consequence of this issue a DataFrame with the correct schema can't be 
> converted to a Dataset using .as[MyType]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22398) Partition directories with leading 0s cause wrong results

2017-11-05 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16239810#comment-16239810
 ] 

Liang-Chi Hsieh commented on SPARK-22398:
-

As we can control it with the config 
`spark.sql.sources.partitionColumnTypeInference.enabled`, I think we can close 
this.

> Partition directories with leading 0s cause wrong results
> -
>
> Key: SPARK-22398
> URL: https://issues.apache.org/jira/browse/SPARK-22398
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Bogdan Raducanu
>
> Repro case:
> {code}
> spark.range(8).selectExpr("'0' || cast(id as string) as id", "id as 
> b").write.mode("overwrite").partitionBy("id").parquet("/tmp/bug1")
> spark.read.parquet("/tmp/bug1").where("id in ('01')").show
> +---+---+
> |  b| id|
> +---+---+
> +---+---+
> spark.read.parquet("/tmp/bug1").where("id = '01'").show
> +---+---+
> |  b| id|
> +---+---+
> |  1|  1|
> +---+---+
> {code}
> I think somewhere there is some special handling of this case for equals but 
> not the same for IN.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22398) Partition directories with leading 0s cause wrong results

2017-11-01 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235083#comment-16235083
 ] 

Liang-Chi Hsieh commented on SPARK-22398:
-

[~mgaido], I'd prefer to treat them as integer by default. Because you can 
easily disable `partitionColumnTypeInference` to read them as string, but if we 
treat them as string in the inference, you can't make them as integer vice 
versa by disable the inference.

> Partition directories with leading 0s cause wrong results
> -
>
> Key: SPARK-22398
> URL: https://issues.apache.org/jira/browse/SPARK-22398
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Bogdan Raducanu
>Priority: Major
>
> Repro case:
> {code}
> spark.range(8).selectExpr("'0' || cast(id as string) as id", "id as 
> b").write.mode("overwrite").partitionBy("id").parquet("/tmp/bug1")
> spark.read.parquet("/tmp/bug1").where("id in ('01')").show
> +---+---+
> |  b| id|
> +---+---+
> +---+---+
> spark.read.parquet("/tmp/bug1").where("id = '01'").show
> +---+---+
> |  b| id|
> +---+---+
> |  1|  1|
> +---+---+
> {code}
> I think somewhere there is some special handling of this case for equals but 
> not the same for IN.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22406) pyspark version tag is wrong on PyPi

2017-10-31 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233700#comment-16233700
 ] 

Liang-Chi Hsieh commented on SPARK-22406:
-

cc [~holdenk]

> pyspark version tag is wrong on PyPi
> 
>
> Key: SPARK-22406
> URL: https://issues.apache.org/jira/browse/SPARK-22406
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Kerrick Staley
>Priority: Minor
>
> On pypi.python.org, the pyspark package is tagged with version 
> {{2.2.0.post0}}: https://pypi.python.org/pypi/pyspark/2.2.0
> However, when you install the package, it has version {{2.2.0}}.
> This has really annoying consequences: if you try {{pip install 
> pyspark==2.2.0}}, it won't work. Instead you have to do {{pip install 
> pyspark==2.2.0.post0}}. Then, if you later run the same command ({{pip 
> install pyspark==2.2.0.post0}}), it won't recognize the existing pyspark 
> installation (because it has version {{2.2.0}}) and instead will reinstall 
> it, which is very slow because pyspark is a large package.
> This can happen if you add a new package to a {{requirements.txt}} file; you 
> end up waiting a lot longer than necessary because every time you run {{pip 
> install -r requirements.txt}} it reinstalls pyspark.
> Can you please change the package on PyPi to have the version {{2.2.0}}?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22347) UDF is evaluated when 'F.when' condition is false

2017-10-30 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-22347:

Issue Type: Documentation  (was: Bug)

> UDF is evaluated when 'F.when' condition is false
> -
>
> Key: SPARK-22347
> URL: https://issues.apache.org/jira/browse/SPARK-22347
> Project: Spark
>  Issue Type: Documentation
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Nicolas Porter
>Priority: Minor
>
> Here's a simple example on how to reproduce this:
> {code}
> from pyspark.sql import functions as F, Row, types
> def Divide10():
> def fn(value): return 10 / int(value)
> return F.udf(fn, types.IntegerType())
> df = sc.parallelize([Row(x=5), Row(x=0)]).toDF()
> x = F.col('x')
> df2 = df.select(F.when((x > 0), Divide10()(x)))
> df2.show(200)
> {code}
> This raises a division by zero error, even if `F.when` is trying to filter 
> out all cases where `x <= 0`. I believe the correct behavior should be not to 
> evaluate the UDF when the `F.when` condition is false.
> Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, 
> then the error is not raised and all rows resolve to `null`, which is the 
> expected result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11215) Add multiple columns support to StringIndexer

2017-10-30 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224975#comment-16224975
 ] 

Liang-Chi Hsieh commented on SPARK-11215:
-

Hi [~WeichenXu123], I'd like to know if you are busy these days, if so, I could 
work on it. Please let me know. Thanks.

> Add multiple columns support to StringIndexer
> -
>
> Key: SPARK-11215
> URL: https://issues.apache.org/jira/browse/SPARK-11215
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Reporter: Yanbo Liang
>Assignee: Yanbo Liang
>
> Add multiple columns support to StringIndexer, then users can transform 
> multiple input columns to multiple output columns simultaneously. See 
> discussion SPARK-8418.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22291) Postgresql UUID[] to Cassandra: Conversion Error

2017-10-29 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224247#comment-16224247
 ] 

Liang-Chi Hsieh commented on SPARK-22291:
-

Thanks [~hyukjin.kwon].

> Postgresql UUID[] to Cassandra: Conversion Error
> 
>
> Key: SPARK-22291
> URL: https://issues.apache.org/jira/browse/SPARK-22291
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.2.0
> Environment: Debian Linux, Scala 2.11, Spark 2.2.0, PostgreSQL 9.6, 
> Cassandra 3
>Reporter: Fabio J. Walter
>Assignee: Jen-Ming Chung
>  Labels: patch, postgresql, sql
> Fix For: 2.3.0
>
> Attachments: 
> org_apache_spark_sql_execution_datasources_jdbc_JdbcUtil.png
>
>
> My job reads data from a PostgreSQL table that contains columns of user_ids 
> uuid[] type, so that I'm getting the error above when I'm trying to save data 
> on Cassandra.
> However, the creation of this same table on Cassandra works fine!  user_ids 
> list.
> I can't change the type on the source table, because I'm reading data from a 
> legacy system.
> I've been looking at point printed on log, on class 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.scala
> Stacktrace on Spark:
> {noformat}
> Caused by: java.lang.ClassCastException: [Ljava.util.UUID; cannot be cast to 
> [Ljava.lang.String;
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$14.apply(JdbcUtils.scala:443)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$14.apply(JdbcUtils.scala:442)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$13$$anonfun$18.apply(JdbcUtils.scala:472)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$13$$anonfun$18.apply(JdbcUtils.scala:472)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$nullSafeConvert(JdbcUtils.scala:482)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$13.apply(JdbcUtils.scala:470)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$13.apply(JdbcUtils.scala:469)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at 
> org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133)
> at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
> at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Exec

[jira] [Commented] (SPARK-22291) Postgresql UUID[] to Cassandra: Conversion Error

2017-10-29 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224245#comment-16224245
 ] 

Liang-Chi Hsieh commented on SPARK-22291:
-

[~cloud_fan] The Assignee should be [~jmchung]. Thanks.

> Postgresql UUID[] to Cassandra: Conversion Error
> 
>
> Key: SPARK-22291
> URL: https://issues.apache.org/jira/browse/SPARK-22291
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.2.0
> Environment: Debian Linux, Scala 2.11, Spark 2.2.0, PostgreSQL 9.6, 
> Cassandra 3
>Reporter: Fabio J. Walter
>Assignee: Fabio J. Walter
>  Labels: patch, postgresql, sql
> Fix For: 2.3.0
>
> Attachments: 
> org_apache_spark_sql_execution_datasources_jdbc_JdbcUtil.png
>
>
> My job reads data from a PostgreSQL table that contains columns of user_ids 
> uuid[] type, so that I'm getting the error above when I'm trying to save data 
> on Cassandra.
> However, the creation of this same table on Cassandra works fine!  user_ids 
> list.
> I can't change the type on the source table, because I'm reading data from a 
> legacy system.
> I've been looking at point printed on log, on class 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.scala
> Stacktrace on Spark:
> {noformat}
> Caused by: java.lang.ClassCastException: [Ljava.util.UUID; cannot be cast to 
> [Ljava.lang.String;
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$14.apply(JdbcUtils.scala:443)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$14.apply(JdbcUtils.scala:442)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$13$$anonfun$18.apply(JdbcUtils.scala:472)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$13$$anonfun$18.apply(JdbcUtils.scala:472)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$nullSafeConvert(JdbcUtils.scala:482)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$13.apply(JdbcUtils.scala:470)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$13.apply(JdbcUtils.scala:469)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at 
> org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133)
> at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
> at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> 

[jira] [Comment Edited] (SPARK-22347) UDF is evaluated when 'F.when' condition is false

2017-10-27 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221612#comment-16221612
 ] 

Liang-Chi Hsieh edited comment on SPARK-22347 at 10/27/17 2:30 PM:
---

Under the current execution mode of Python UDFs, I think it is hard to support 
Python UDFs as branch values or else value in CaseWhen expression. The 
execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at 
once. It might not be easy to let it support conditional execution. -I'd rather 
like to disable the usage of Python UDFs in CaseWhen. I think it can be very 
easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. 
for the above example:- (note: this will break compatibility issue, we might 
still need to support it)

{code}
def divideByValue():
def fn(value): return 10 / int(value) if (value > 0) else None
return udf(fn, types.IntegerType())

df2 = df.select(divideByValue()(x))
df2.show()
+-+
|fn(x)|
+-+
|2|
| null|
+-+
{code}



was (Author: viirya):
Under the current execution mode of Python UDFs, I think it is hard to support 
Python UDFs as branch values or else value in CaseWhen expression. The 
execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at 
once. It might not be easy to let it support conditional execution. I'd rather 
like to disable the usage of Python UDFs in CaseWhen. I think it can be very 
easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. 
for the above example:

{code}
def divideByValue():
def fn(value): return 10 / int(value) if (value > 0) else None
return udf(fn, types.IntegerType())

df2 = df.select(divideByValue()(x))
df2.show()
+-+
|fn(x)|
+-+
|2|
| null|
+-+
{code}


> UDF is evaluated when 'F.when' condition is false
> -
>
> Key: SPARK-22347
> URL: https://issues.apache.org/jira/browse/SPARK-22347
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Nicolas Porter
>Priority: Minor
>
> Here's a simple example on how to reproduce this:
> {code}
> from pyspark.sql import functions as F, Row, types
> def Divide10():
> def fn(value): return 10 / int(value)
> return F.udf(fn, types.IntegerType())
> df = sc.parallelize([Row(x=5), Row(x=0)]).toDF()
> x = F.col('x')
> df2 = df.select(F.when((x > 0), Divide10()(x)))
> df2.show(200)
> {code}
> This raises a division by zero error, even if `F.when` is trying to filter 
> out all cases where `x <= 0`. I believe the correct behavior should be not to 
> evaluate the UDF when the `F.when` condition is false.
> Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, 
> then the error is not raised and all rows resolve to `null`, which is the 
> expected result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22347) UDF is evaluated when 'F.when' condition is false

2017-10-26 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221612#comment-16221612
 ] 

Liang-Chi Hsieh edited comment on SPARK-22347 at 10/27/17 5:06 AM:
---

Under the current execution mode of Python UDFs, I think it is hard to support 
Python UDFs as branch values or else value in CaseWhen expression. The 
execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at 
once. It might not be easy to let it support conditional execution. I'd rather 
like to disable the usage of Python UDFs in CaseWhen. I think it can be very 
easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. 
for the above example:

{code}
def divideByValue():
def fn(value): return 10 / int(value) if (value > 0) else None
return udf(fn, types.IntegerType())

df2 = df.select(divideByValue()(x))
df2.show()
+-+
|fn(x)|
+-+
|2|
| null|
+-+
{code}



was (Author: viirya):
Under the current execution mode of Python UDFs, I think it is hard to support 
Python UDFs as branch values or else value in CaseWhen expression. The 
execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at 
once. It might not be easy to let it support conditional execution. I'd rather 
like to disable the usage of Python UDFs in CaseWhen. I think it can be very 
easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. 
for the above example:

{code}
def divideByValue():
def fn(value): return 10 / int(value) if (value > 0) else None
return udf(fn, types.IntegerType())

df2 = df.select(divideByValue()(x)))
df2.show()
++
|CASE WHEN (x > 0) THEN fn(x) END|
++
|   2|
|null|
++
{code}


> UDF is evaluated when 'F.when' condition is false
> -
>
> Key: SPARK-22347
> URL: https://issues.apache.org/jira/browse/SPARK-22347
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Nicolas Porter
>Priority: Minor
>
> Here's a simple example on how to reproduce this:
> {code}
> from pyspark.sql import functions as F, Row, types
> def Divide10():
> def fn(value): return 10 / int(value)
> return F.udf(fn, types.IntegerType())
> df = sc.parallelize([Row(x=5), Row(x=0)]).toDF()
> x = F.col('x')
> df2 = df.select(F.when((x > 0), Divide10()(x)))
> df2.show(200)
> {code}
> This raises a division by zero error, even if `F.when` is trying to filter 
> out all cases where `x <= 0`. I believe the correct behavior should be not to 
> evaluate the UDF when the `F.when` condition is false.
> Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, 
> then the error is not raised and all rows resolve to `null`, which is the 
> expected result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22347) UDF is evaluated when 'F.when' condition is false

2017-10-26 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221612#comment-16221612
 ] 

Liang-Chi Hsieh edited comment on SPARK-22347 at 10/27/17 5:05 AM:
---

Under the current execution mode of Python UDFs, I think it is hard to support 
Python UDFs as branch values or else value in CaseWhen expression. The 
execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at 
once. It might not be easy to let it support conditional execution. I'd rather 
like to disable the usage of Python UDFs in CaseWhen. I think it can be very 
easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. 
for the above example:

{code}
def divideByValue():
def fn(value): return 10 / int(value) if (value > 0) else None
return udf(fn, types.IntegerType())

df2 = df.select(divideByValue()(x)))
df2.show()
++
|CASE WHEN (x > 0) THEN fn(x) END|
++
|   2|
|null|
++
{code}



was (Author: viirya):
Under the current execution mode of Python UDFs, I think it is hard to support 
Python UDFs as branch values or else value in CaseWhen expression. The 
execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at 
once. It might not be easy to let it support conditional execution. I'd rather 
like to disable the usage of Python UDFs in CaseWhen. I think it can be very 
easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. 
for the above example:

{code}
def Divide10():
def fn(value): return 10 / int(value) if (value > 0) else None
return udf(fn, types.IntegerType())

df2 = df.select(when((x > 0), Divide10()(x)))
df2.show()
++
|CASE WHEN (x > 0) THEN fn(x) END|
++
|   2|
|null|
++
{code}


> UDF is evaluated when 'F.when' condition is false
> -
>
> Key: SPARK-22347
> URL: https://issues.apache.org/jira/browse/SPARK-22347
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Nicolas Porter
>Priority: Minor
>
> Here's a simple example on how to reproduce this:
> {code}
> from pyspark.sql import functions as F, Row, types
> def Divide10():
> def fn(value): return 10 / int(value)
> return F.udf(fn, types.IntegerType())
> df = sc.parallelize([Row(x=5), Row(x=0)]).toDF()
> x = F.col('x')
> df2 = df.select(F.when((x > 0), Divide10()(x)))
> df2.show(200)
> {code}
> This raises a division by zero error, even if `F.when` is trying to filter 
> out all cases where `x <= 0`. I believe the correct behavior should be not to 
> evaluate the UDF when the `F.when` condition is false.
> Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, 
> then the error is not raised and all rows resolve to `null`, which is the 
> expected result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22347) UDF is evaluated when 'F.when' condition is false

2017-10-26 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221612#comment-16221612
 ] 

Liang-Chi Hsieh edited comment on SPARK-22347 at 10/27/17 2:44 AM:
---

Under the current execution mode of Python UDFs, I think it is hard to support 
Python UDFs as branch values or else value in CaseWhen expression. The 
execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at 
once. It might not be easy to let it support conditional execution. I'd rather 
like to disable the usage of Python UDFs in CaseWhen. I think it can be very 
easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. 
for the above example:

{code}
def Divide10():
def fn(value): return 10 / int(value) if (value > 0) else None
return udf(fn, types.IntegerType())

df2 = df.select(when((x > 0), Divide10()(x)))
df2.show()
++
|CASE WHEN (x > 0) THEN fn(x) END|
++
|   2|
|null|
++
{code}



was (Author: viirya):
Under the current execution mode of Python UDFs, I think it is hard to support 
Python UDFs as branch values or else value in CaseWhen expression. The 
execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at 
once. It might not be easy to let it support conditional execution. I'd rather 
disable the usage of Python UDFs in CaseWhen. I think it can be very easy to 
incorporate the condition logic of CaseWhen into the Python UDFs, e.g. for the 
above example:

{code}
def Divide10():
def fn(value): return 10 / int(value) if (value > 0) else None
return udf(fn, types.IntegerType())

df2 = df.select(when((x > 0), Divide10()(x)))
df2.show()
++
|CASE WHEN (x > 0) THEN fn(x) END|
++
|   2|
|null|
++
{code}


> UDF is evaluated when 'F.when' condition is false
> -
>
> Key: SPARK-22347
> URL: https://issues.apache.org/jira/browse/SPARK-22347
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Nicolas Porter
>Priority: Minor
>
> Here's a simple example on how to reproduce this:
> {code}
> from pyspark.sql import functions as F, Row, types
> def Divide10():
> def fn(value): return 10 / int(value)
> return F.udf(fn, types.IntegerType())
> df = sc.parallelize([Row(x=5), Row(x=0)]).toDF()
> x = F.col('x')
> df2 = df.select(F.when((x > 0), Divide10()(x)))
> df2.show(200)
> {code}
> This raises a division by zero error, even if `F.when` is trying to filter 
> out all cases where `x <= 0`. I believe the correct behavior should be not to 
> evaluate the UDF when the `F.when` condition is false.
> Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, 
> then the error is not raised and all rows resolve to `null`, which is the 
> expected result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22347) UDF is evaluated when 'F.when' condition is false

2017-10-26 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221612#comment-16221612
 ] 

Liang-Chi Hsieh commented on SPARK-22347:
-

Under the current execution mode of Python UDFs, I think it is hard to support 
Python UDFs as branch values or else value in CaseWhen expression. The 
execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at 
once. It might not be easy to let it support conditional execution. I'd rather 
disable the usage of Python UDFs in CaseWhen. I think it can be very easy to 
incorporate the condition logic of CaseWhen into the Python UDFs, e.g. for the 
above example:

{code}
def Divide10():
def fn(value): return 10 / int(value) if (value > 0) else None
return udf(fn, types.IntegerType())

df2 = df.select(when((x > 0), Divide10()(x)))
df2.show()
++
|CASE WHEN (x > 0) THEN fn(x) END|
++
|   2|
|null|
++
{code}


> UDF is evaluated when 'F.when' condition is false
> -
>
> Key: SPARK-22347
> URL: https://issues.apache.org/jira/browse/SPARK-22347
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Nicolas Porter
>Priority: Minor
>
> Here's a simple example on how to reproduce this:
> {code}
> from pyspark.sql import functions as F, Row, types
> def Divide10():
> def fn(value): return 10 / int(value)
> return F.udf(fn, types.IntegerType())
> df = sc.parallelize([Row(x=5), Row(x=0)]).toDF()
> x = F.col('x')
> df2 = df.select(F.when((x > 0), Divide10()(x)))
> df2.show(200)
> {code}
> This raises a division by zero error, even if `F.when` is trying to filter 
> out all cases where `x <= 0`. I believe the correct behavior should be not to 
> evaluate the UDF when the `F.when` condition is false.
> Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, 
> then the error is not raised and all rows resolve to `null`, which is the 
> expected result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22335) Union for DataSet uses column order instead of types for union

2017-10-25 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16218547#comment-16218547
 ] 

Liang-Chi Hsieh edited comment on SPARK-22335 at 10/25/17 1:18 PM:
---

IMHO, the concept of {{union}} API in Dataset is tied to row representation, 
instead of typed objects. This current behavior follows the standard in SQL to 
resolve columns by position. Another advantage to tie with row representation 
in this API is we don't need to deserialize the rows back to typed objects when 
performing union.

A version of {{union}} that works in that way to satisfy the need for typed 
objects, would be like to unify the column order when doing union on two 
Datasets. That would break the SQL standard way of union, IIUC.

We can deserialize to typed objects and do union which is basically to do union 
with RDD API. But it should be a lot performance regression.

Considering those factors, I think an {{union}} API like current one which acts 
in SQL standard way is most acceptable. We also provide {{unionByName}} API 
which can work well in that scenario. We should also consider API compatibility 
issue. So I don't think we have enough reasons to change {{union}} API 
semantics.






was (Author: viirya):
IMHO, the concept of {{union}} API in Dataset is tied to row representation, 
instead of typed objects. This current behavior follows the standard in SQL to 
resolve columns by position. Another advantage to tie with row representation 
in this API is we don't need to deserialize the rows back to typed objects when 
performing union.

A version of {{union}} that works in that way to satisfy the need for typed 
objects, would be like to unify the column order when doing union on two 
Datasets. That would break the SQL standard way of union, IIUC.

We can deserialize to typed objects and do union which is basically to do union 
with RDD API. But it should be a lot performance regression.

Considering those factors, I think an {{union}} API like current one which acts 
in SQL standard way is most acceptable. We also provide {{unionByName}} API 
which can work well in that scenario.





> Union for DataSet uses column order instead of types for union
> --
>
> Key: SPARK-22335
> URL: https://issues.apache.org/jira/browse/SPARK-22335
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Carlos Bribiescas
>
> I see union uses column order for a DF. This to me is "fine" since they 
> aren't typed.
> However, for a dataset which is supposed to be strongly typed it is actually 
> giving the wrong result. If you try to access the members by name, it will 
> use the order. Heres is a reproducible case. 2.2.0
> {code:java}
>   case class AB(a : String, b : String)
>   val abDf = sc.parallelize(List(("aThing","bThing"))).toDF("a", "b")
>   val baDf = sc.parallelize(List(("bThing","aThing"))).toDF("b", "a")
>   
>   abDf.union(baDf).show() // as linked ticket states, its "Not a problem"
>   
>   val abDs = abDf.as[AB]
>   val baDs = baDf.as[AB]
>   
>   abDs.union(baDs).show()  // This gives wrong result since a Dataset[AB] 
> should be correctly mapped by type, not by column order
>   
>   abDs.union(baDs).map(_.a).show() // This gives wrong result since a 
> Dataset[AB] should be correctly mapped by type, not by column order
>abDs.union(baDs).rdd.take(2) // This also gives wrong result
>   baDs.map(_.a).show() // However, this gives the correct result, even though 
> columns were out of order.
>   abDs.map(_.a).show() // This is correct too
>   baDs.select("a","b").as[AB].union(abDs).show() // This is the same 
> workaround for linked issue, slightly modified.  However this seems wrong 
> since its supposed to be strongly typed
>   
>   baDs.rdd.toDF().as[AB].union(abDs).show()  // This however gives correct 
> result, which is logically inconsistent behavior
>   abDs.rdd.union(baDs.rdd).toDF().show() // Simpler example that gives 
> correct result
> {code}
> So its inconsistent and a bug IMO.  And I'm not sure that the suggested work 
> around is really fair, since I'm supposed to be getting of type `AB`.  More 
> importantly I think the issue is bigger when you consider that it happens 
> even if you read from parquet (as you would expect).  And that its 
> inconsistent when going to/from rdd.
> I imagine its just lazily converting to typed DS instead of initially.  So 
> either that typing could be prioritized to happen before the union or 
> unioning of DF could be done with column order taken into account.  Again, 
> this is speculation..



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr

[jira] [Commented] (SPARK-22335) Union for DataSet uses column order instead of types for union

2017-10-25 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16218547#comment-16218547
 ] 

Liang-Chi Hsieh commented on SPARK-22335:
-

IMHO, the concept of {{union}} API in Dataset is tied to row representation, 
instead of typed objects. This current behavior follows the standard in SQL to 
resolve columns by position. Another advantage to tie with row representation 
in this API is we don't need to deserialize the rows back to typed objects when 
performing union.

A version of {{union}} that works in that way to satisfy the need for typed 
objects, would be like to unify the column order when doing union on two 
Datasets. That would break the SQL standard way of union, IIUC.

We can deserialize to typed objects and do union which is basically to do union 
with RDD API. But it should be a lot performance regression.

Considering those factors, I think an {{union}} API like current one which acts 
in SQL standard way is most acceptable. We also provide {{unionByName}} API 
which can work well in that scenario.





> Union for DataSet uses column order instead of types for union
> --
>
> Key: SPARK-22335
> URL: https://issues.apache.org/jira/browse/SPARK-22335
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Carlos Bribiescas
>
> I see union uses column order for a DF. This to me is "fine" since they 
> aren't typed.
> However, for a dataset which is supposed to be strongly typed it is actually 
> giving the wrong result. If you try to access the members by name, it will 
> use the order. Heres is a reproducible case. 2.2.0
> {code:java}
>   case class AB(a : String, b : String)
>   val abDf = sc.parallelize(List(("aThing","bThing"))).toDF("a", "b")
>   val baDf = sc.parallelize(List(("bThing","aThing"))).toDF("b", "a")
>   
>   abDf.union(baDf).show() // as linked ticket states, its "Not a problem"
>   
>   val abDs = abDf.as[AB]
>   val baDs = baDf.as[AB]
>   
>   abDs.union(baDs).show()  // This gives wrong result since a Dataset[AB] 
> should be correctly mapped by type, not by column order
>   
>   abDs.union(baDs).map(_.a).show() // This gives wrong result since a 
> Dataset[AB] should be correctly mapped by type, not by column order
>abDs.union(baDs).rdd.take(2) // This also gives wrong result
>   baDs.map(_.a).show() // However, this gives the correct result, even though 
> columns were out of order.
>   abDs.map(_.a).show() // This is correct too
>   baDs.select("a","b").as[AB].union(abDs).show() // This is the same 
> workaround for linked issue, slightly modified.  However this seems wrong 
> since its supposed to be strongly typed
>   
>   baDs.rdd.toDF().as[AB].union(abDs).show()  // This however gives correct 
> result, which is logically inconsistent behavior
>   abDs.rdd.union(baDs.rdd).toDF().show() // Simpler example that gives 
> correct result
> {code}
> So its inconsistent and a bug IMO.  And I'm not sure that the suggested work 
> around is really fair, since I'm supposed to be getting of type `AB`.  More 
> importantly I think the issue is bigger when you consider that it happens 
> even if you read from parquet (as you would expect).  And that its 
> inconsistent when going to/from rdd.
> I imagine its just lazily converting to typed DS instead of initially.  So 
> either that typing could be prioritized to happen before the union or 
> unioning of DF could be done with column order taken into account.  Again, 
> this is speculation..



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22335) Union for DataSet uses column order instead of types for union

2017-10-24 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16218027#comment-16218027
 ] 

Liang-Chi Hsieh commented on SPARK-22335:
-

[~CBribiescas] The column position in the schema of a Dataset doesn't 
necessarily match the fields in the typed objects. The document of {{as}} has 
explained it:

{code}
Returns a new Dataset where each record has been mapped on to the specified 
type. The
method used to map columns depend on the type of `U`:
 - When `U` is a class, fields for the class will be mapped to columns of the 
same name
   (case sensitivity is determined by `spark.sql.caseSensitive`).
{code}

{{unionByName}} resolves columns by name. For the typed objects, I think it can 
satisfy this kind of usage.

I remember that this is not the first ticket opened for union behavior on 
Datasets. The first thing might be that we can better describe this in the 
document of {{union}}.



> Union for DataSet uses column order instead of types for union
> --
>
> Key: SPARK-22335
> URL: https://issues.apache.org/jira/browse/SPARK-22335
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Carlos Bribiescas
>
> I see union uses column order for a DF. This to me is "fine" since they 
> aren't typed.
> However, for a dataset which is supposed to be strongly typed it is actually 
> giving the wrong result. If you try to access the members by name, it will 
> use the order. Heres is a reproducible case. 2.2.0
> {code:java}
>   case class AB(a : String, b : String)
>   val abDf = sc.parallelize(List(("aThing","bThing"))).toDF("a", "b")
>   val baDf = sc.parallelize(List(("bThing","aThing"))).toDF("b", "a")
>   
>   abDf.union(baDf).show() // as linked ticket states, its "Not a problem"
>   
>   val abDs = abDf.as[AB]
>   val baDs = baDf.as[AB]
>   
>   abDs.union(baDs).show()  // This gives wrong result since a Dataset[AB] 
> should be correctly mapped by type, not by column order
>   
>   abDs.union(baDs).map(_.a).show() // This gives wrong result since a 
> Dataset[AB] should be correctly mapped by type, not by column order
>abDs.union(baDs).rdd.take(2) // This also gives wrong result
>   baDs.map(_.a).show() // However, this gives the correct result, even though 
> columns were out of order.
>   abDs.map(_.a).show() // This is correct too
>   baDs.select("a","b").as[AB].union(abDs).show() // This is the same 
> workaround for linked issue, slightly modified.  However this seems wrong 
> since its supposed to be strongly typed
>   
>   baDs.rdd.toDF().as[AB].union(abDs).show()  // This however gives correct 
> result, which is logically inconsistent behavior
>   abDs.rdd.union(baDs.rdd).toDF().show() // Simpler example that gives 
> correct result
> {code}
> So its inconsistent and a bug IMO.  And I'm not sure that the suggested work 
> around is really fair, since I'm supposed to be getting of type `AB`.  More 
> importantly I think the issue is bigger when you consider that it happens 
> even if you read from parquet (as you would expect).  And that its 
> inconsistent when going to/from rdd.
> I imagine its just lazily converting to typed DS instead of initially.  So 
> either that typing could be prioritized to happen before the union or 
> unioning of DF could be done with column order taken into account.  Again, 
> this is speculation..



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22348) The table cache providing ColumnarBatch should also do partition batch pruning

2017-10-24 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-22348:
---

 Summary: The table cache providing ColumnarBatch should also do 
partition batch pruning
 Key: SPARK-22348
 URL: https://issues.apache.org/jira/browse/SPARK-22348
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Liang-Chi Hsieh


We enable table cache {{InMemoryTableScanExec}} to provide {{ColumnarBatch}} 
now. But the cached batches are retrieved without pruning. In this case, we 
still need to do partition batch pruning.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22335) Union for DataSet uses column order instead of types for union

2017-10-23 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16216248#comment-16216248
 ] 

Liang-Chi Hsieh edited comment on SPARK-22335 at 10/24/17 3:36 AM:
---

Can't {{unionByName}} solve it? 

{code}
scala> abDs.unionByName(baDs).show()
+--+--+
| a| b|
+--+--+
|aThing|bThing|
|aThing|bThing|
+--+--+
{code}



was (Author: viirya):
Can't {{unionByName}} solve it? 

{code]
scala> abDs.unionByName(baDs).show()
+--+--+
| a| b|
+--+--+
|aThing|bThing|
|aThing|bThing|
+--+--+
{code}


> Union for DataSet uses column order instead of types for union
> --
>
> Key: SPARK-22335
> URL: https://issues.apache.org/jira/browse/SPARK-22335
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Carlos Bribiescas
>
> I see union uses column order for a DF. This to me is "fine" since they 
> aren't typed.
> However, for a dataset which is supposed to be strongly typed it is actually 
> giving the wrong result. If you try to access the members by name, it will 
> use the order. Heres is a reproducible case. 2.2.0
> {code:java}
>   case class AB(a : String, b : String)
>   val abDf = sc.parallelize(List(("aThing","bThing"))).toDF("a", "b")
>   val baDf = sc.parallelize(List(("bThing","aThing"))).toDF("b", "a")
>   
>   abDf.union(baDf).show() // as linked ticket states, its "Not a problem"
>   
>   val abDs = abDf.as[AB]
>   val baDs = baDf.as[AB]
>   
>   abDs.union(baDs).show()  // This gives wrong result since a Dataset[AB] 
> should be correctly mapped by type, not by column order
>   
>   abDs.union(baDs).map(_.a).show() // This gives wrong result since a 
> Dataset[AB] should be correctly mapped by type, not by column order
>abDs.union(baDs).rdd.take(2) // This also gives wrong result
>   baDs.map(_.a).show() // However, this gives the correct result, even though 
> columns were out of order.
>   abDs.map(_.a).show() // This is correct too
>   baDs.select("a","b").as[AB].union(abDs).show() // This is the same 
> workaround for linked issue, slightly modified.  However this seems wrong 
> since its supposed to be strongly typed
>   
>   baDs.rdd.toDF().as[AB].union(abDs).show()  // This however gives correct 
> result, which is logically inconsistent behavior
>   abDs.rdd.union(baDs.rdd).toDF().show() // Simpler example that gives 
> correct result
> {code}
> So its inconsistent and a bug IMO.  And I'm not sure that the suggested work 
> around is really fair, since I'm supposed to be getting of type `AB`.  More 
> importantly I think the issue is bigger when you consider that it happens 
> even if you read from parquet (as you would expect).  And that its 
> inconsistent when going to/from rdd.
> I imagine its just lazily converting to typed DS instead of initially.  So 
> either that typing could be prioritized to happen before the union or 
> unioning of DF could be done with column order taken into account.  Again, 
> this is speculation..



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22335) Union for DataSet uses column order instead of types for union

2017-10-23 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16216248#comment-16216248
 ] 

Liang-Chi Hsieh commented on SPARK-22335:
-

Can't {{unionByName}} solve it? 

{code]
scala> abDs.unionByName(baDs).show()
+--+--+
| a| b|
+--+--+
|aThing|bThing|
|aThing|bThing|
+--+--+
{code}


> Union for DataSet uses column order instead of types for union
> --
>
> Key: SPARK-22335
> URL: https://issues.apache.org/jira/browse/SPARK-22335
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Carlos Bribiescas
>
> I see union uses column order for a DF. This to me is "fine" since they 
> aren't typed.
> However, for a dataset which is supposed to be strongly typed it is actually 
> giving the wrong result. If you try to access the members by name, it will 
> use the order. Heres is a reproducible case. 2.2.0
> {code:java}
>   case class AB(a : String, b : String)
>   val abDf = sc.parallelize(List(("aThing","bThing"))).toDF("a", "b")
>   val baDf = sc.parallelize(List(("bThing","aThing"))).toDF("b", "a")
>   
>   abDf.union(baDf).show() // as linked ticket states, its "Not a problem"
>   
>   val abDs = abDf.as[AB]
>   val baDs = baDf.as[AB]
>   
>   abDs.union(baDs).show()  // This gives wrong result since a Dataset[AB] 
> should be correctly mapped by type, not by column order
>   
>   abDs.union(baDs).map(_.a).show() // This gives wrong result since a 
> Dataset[AB] should be correctly mapped by type, not by column order
>abDs.union(baDs).rdd.take(2) // This also gives wrong result
>   baDs.map(_.a).show() // However, this gives the correct result, even though 
> columns were out of order.
>   abDs.map(_.a).show() // This is correct too
>   baDs.select("a","b").as[AB].union(abDs).show() // This is the same 
> workaround for linked issue, slightly modified.  However this seems wrong 
> since its supposed to be strongly typed
>   
>   baDs.rdd.toDF().as[AB].union(abDs).show()  // This however gives correct 
> result, which is logically inconsistent behavior
>   abDs.rdd.union(baDs.rdd).toDF().show() // Simpler example that gives 
> correct result
> {code}
> So its inconsistent and a bug IMO.  And I'm not sure that the suggested work 
> around is really fair, since I'm supposed to be getting of type `AB`.  More 
> importantly I think the issue is bigger when you consider that it happens 
> even if you read from parquet (as you would expect).  And that its 
> inconsistent when going to/from rdd.
> I imagine its just lazily converting to typed DS instead of initially.  So 
> either that typing could be prioritized to happen before the union or 
> unioning of DF could be done with column order taken into account.  Again, 
> this is speculation..



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22291) Postgresql UUID[] to Cassandra: Conversion Error

2017-10-19 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212154#comment-16212154
 ] 

Liang-Chi Hsieh commented on SPARK-22291:
-

Could you send a PR for this?

> Postgresql UUID[] to Cassandra: Conversion Error
> 
>
> Key: SPARK-22291
> URL: https://issues.apache.org/jira/browse/SPARK-22291
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.2.0
> Environment: Debian Linux, Scala 2.11, Spark 2.2.0, PostgreSQL 9.6, 
> Cassandra 3
>Reporter: Fabio J. Walter
>  Labels: patch, postgresql, sql
> Attachments: 
> org_apache_spark_sql_execution_datasources_jdbc_JdbcUtil.png
>
>
> My job reads data from a PostgreSQL table that contains columns of user_ids 
> uuid[] type, so that I'm getting the error above when I'm trying to save data 
> on Cassandra.
> However, the creation of this same table on Cassandra works fine!  user_ids 
> list.
> I can't change the type on the source table, because I'm reading data from a 
> legacy system.
> I've been looking at point printed on log, on class 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.scala
> Stacktrace on Spark:
> {noformat}
> Caused by: java.lang.ClassCastException: [Ljava.util.UUID; cannot be cast to 
> [Ljava.lang.String;
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$14.apply(JdbcUtils.scala:443)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$14.apply(JdbcUtils.scala:442)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$13$$anonfun$18.apply(JdbcUtils.scala:472)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$13$$anonfun$18.apply(JdbcUtils.scala:472)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$nullSafeConvert(JdbcUtils.scala:482)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$13.apply(JdbcUtils.scala:470)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$13.apply(JdbcUtils.scala:469)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)
> at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at 
> org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133)
> at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
> at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at 
> java.util.concu

[jira] [Commented] (SPARK-22296) CodeGenerator - failed to compile when constructor has scala.collection.mutable.Seq vs. scala.collection.Seq

2017-10-19 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212123#comment-16212123
 ] 

Liang-Chi Hsieh commented on SPARK-22296:
-

Seems no problem with 2.2?

{code}
scala> case class TestData(x: Int, s: String, seq: 
scala.collection.mutable.Seq[Int])
defined class TestData

scala> val ds = Seq(TestData(1, "a", scala.collection.mutable.Seq.empty[Int]), 
TestData(2, "b", scala.collection.mutable.Seq(1, 2))).toDS

scala> ds.show
+---+---+--+
|  x|  s|   seq|
+---+---+--+
|  1|  a|[]|
|  2|  b|[1, 2]|
+---+---+--+

scala> ds.printSchema
root
 |-- x: integer (nullable = false)
 |-- s: string (nullable = true)
 |-- seq: array (nullable = true)
 ||-- element: integer (containsNull = false)

scala> ds.select("seq")
res4: org.apache.spark.sql.DataFrame = [seq: array]

scala> ds.select("seq").show
+--+
|   seq|
+--+
|[]|
|[1, 2]|
+--+
{code}

> CodeGenerator - failed to compile when constructor has 
> scala.collection.mutable.Seq vs. scala.collection.Seq
> 
>
> Key: SPARK-22296
> URL: https://issues.apache.org/jira/browse/SPARK-22296
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.1.0
>Reporter: Randy Tidd
>
> This is with Scala 2.11.
> We have a case class that has a constructor with 85 args, the last two of 
> which are:
>  var chargesInst : 
> scala.collection.mutable.Seq[ChargeInstitutional] = 
> scala.collection.mutable.Seq.empty[ChargeInstitutional],
>  var chargesProf : 
> scala.collection.mutable.Seq[ChargeProfessional] = 
> scala.collection.mutable.Seq.empty[ChargeProfessional]
> A mutable Seq in a the constructor of a case class is probably poor form but 
> Scala allows it.  When we run this job we get this error:
> build   17-Oct-2017 05:30:502017-10-17 09:30:50 [Executor task launch 
> worker-1] ERROR o.a.s.s.c.e.codegen.CodeGenerator - failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 8217, Column 70: No applicable constructor/method found for actual parameters 
> "java.lang.String, java.lang.String, long, java.lang.String, long, long, 
> long, java.lang.String, long, long, double, scala.Option, scala.Option, 
> java.lang.String, java.lang.String, long, java.lang.String, java.lang.String, 
> java.lang.String, java.lang.String, java.lang.String, java.lang.String, int, 
> java.lang.String, java.lang.String, java.lang.String, java.lang.String, 
> java.lang.String, java.lang.String, java.lang.String, java.lang.String, 
> java.lang.String, java.lang.String, long, long, long, long, long, 
> scala.Option, scala.Option, scala.Option, scala.Option, scala.Option, 
> java.lang.String, java.lang.String, java.lang.String, java.lang.String, 
> java.lang.String, java.lang.String, long, java.lang.String, int, double, 
> double, java.lang.String, java.lang.String, java.lang.String, long, 
> java.lang.String, java.lang.String, java.lang.String, java.lang.String, long, 
> long, long, long, java.lang.String, com.xyz.xyz.xyz.domain.Patient, 
> com.xyz.xyz.xyz.domain.Physician, scala.collection.Seq, scala.collection.Seq, 
> java.lang.String, long, java.lang.String, int, int, boolean, boolean, 
> scala.collection.Seq, boolean, scala.collection.Seq, boolean, 
> scala.collection.Seq, scala.collection.Seq"; candidates are: 
> "com.xyz.xyz.xyz.domain.Account(java.lang.String, java.lang.String, long, 
> java.lang.String, long, long, long, java.lang.String, long, long, double, 
> scala.Option, scala.Option, java.lang.String, java.lang.String, long, 
> java.lang.String, java.lang.String, java.lang.String, java.lang.String, 
> java.lang.String, java.lang.String, int, java.lang.String, java.lang.String, 
> java.lang.String, java.lang.String, java.lang.String, java.lang.String, 
> java.lang.String, java.lang.String, java.lang.String, java.lang.String, long, 
> long, long, long, long, scala.Option, scala.Option, scala.Option, 
> scala.Option, scala.Option, java.lang.String, java.lang.String, 
> java.lang.String, java.lang.String, java.lang.String, java.lang.String, long, 
> java.lang.String, int, double, double, java.lang.String, java.lang.String, 
> java.lang.String, long, java.lang.String, java.lang.String, java.lang.String, 
> java.lang.String, long, long, long, long, java.lang.String, 
> com.xyz.xyz.xyz.domain.Patient, com.xyz.xyz.xyz.domain.Physician, 
> scala.collection.Seq, scala.collection.Seq, java.lang.String, long, 
> java.lang.String, int, int, boolean, boolean, scala.collection.Seq, boolean, 
> scala.collection.Seq, boolean, scala.collection.mutable.Seq, 
> scala.collection.mutable.Seq)"
> The relevant lines are:
> build   17-Oct-2017 05:30:50 

[jira] [Commented] (SPARK-13030) Change OneHotEncoder to Estimator

2017-10-18 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16209203#comment-16209203
 ] 

Liang-Chi Hsieh commented on SPARK-13030:
-

[~josephkb] I think as we add a new class, it is better to add multi-column 
support at beginning.

> Change OneHotEncoder to Estimator
> -
>
> Key: SPARK-13030
> URL: https://issues.apache.org/jira/browse/SPARK-13030
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 1.6.0
>Reporter: Wojciech Jurczyk
>
> OneHotEncoder should be an Estimator, just like in scikit-learn 
> (http://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OneHotEncoder.html).
> In its current form, it is impossible to use when number of categories is 
> different between training dataset and test dataset.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22283) withColumn should replace multiple instances with a single one

2017-10-17 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208585#comment-16208585
 ] 

Liang-Chi Hsieh edited comment on SPARK-22283 at 10/17/17 11:43 PM:


[~kitbellew] {{withColumn}} adds/replaces existing column that has the same 
name. In the ambiguous columns case, it sounds reasonable that it replaces the 
columns with the same name.

We should let a API does one thing. {{withColumn}} adds/replaces column with 
the same name. It sounds weird to implicitly drop one column when there are 
ambiguous columns.

For the use case {{a.join(b, ..., "left").withColumn("c", coalesce(a("c"), 
b("c")).select(..., "c")}}, if you just want to get one of the ambiguous 
columns, the simple workaround can be simply selecting the column like 
{{a.join(b, ..., "left").select(..., a("c"))}}.



was (Author: viirya):
[~kitbellew] I didn't mean you're doing select. I meant you can't select the 
ambiguous columns by name, so isn't it reasonable that you can't also 
withColumn the ambiguous columns by name? They are following the same behavior.

For the use case {{a.join(b, ..., "left").withColumn("c", coalesce(a("c"), 
b("c")).select(..., "c")}}, if you just want to get one of the ambiguous 
columns, the simple workaround can be simply selecting the column like 
{{a.join(b, ..., "left").select(..., a("c"))}}.


> withColumn should replace multiple instances with a single one
> --
>
> Key: SPARK-22283
> URL: https://issues.apache.org/jira/browse/SPARK-22283
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Albert Meltzer
>
> Currently, {{withColumn}} claims to do the following: _"adding a column or 
> replacing the existing column that has the same name."_
> Unfortunately, if multiple existing columns have the same name (which is a 
> normal occurrence after a join), this results in multiple replaced -- and 
> retained --
>  columns (with the same value), and messages about an ambiguous column.
> The current implementation of {{withColumn}} contains this:
> {noformat}
>   def withColumn(colName: String, col: Column): DataFrame = {
> val resolver = sparkSession.sessionState.analyzer.resolver
> val output = queryExecution.analyzed.output
> val shouldReplace = output.exists(f => resolver(f.name, colName))
> if (shouldReplace) {
>   val columns = output.map { field =>
> if (resolver(field.name, colName)) {
>   col.as(colName)
> } else {
>   Column(field)
> }
>   }
>   select(columns : _*)
> } else {
>   select(Column("*"), col.as(colName))
> }
>   }
> {noformat}
> Instead, suggest something like this (which replaces all matching fields with 
> a single instance of the new one):
> {noformat}
>   def withColumn(colName: String, col: Column): DataFrame = {
> val resolver = sparkSession.sessionState.analyzer.resolver
> val output = queryExecution.analyzed.output
> val existing = output.filterNot(f => resolver(f.name, colName)).map(new 
> Column(_))
> select(existing :+ col.as(colName): _*)
>   }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22283) withColumn should replace multiple instances with a single one

2017-10-17 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208585#comment-16208585
 ] 

Liang-Chi Hsieh commented on SPARK-22283:
-

[~kitbellew] I didn't mean you're doing select. I meant you can't select the 
ambiguous columns by name, so isn't it reasonable that you can't also 
withColumn the ambiguous columns by name? They are following the same behavior.

For the use case {{a.join(b, ..., "left").withColumn("c", coalesce(a("c"), 
b("c")).select(..., "c")}}, if you just want to get one of the ambiguous 
columns, the simple workaround can be simply selecting the column like 
{{a.join(b, ..., "left").select(..., a("c"))}}.


> withColumn should replace multiple instances with a single one
> --
>
> Key: SPARK-22283
> URL: https://issues.apache.org/jira/browse/SPARK-22283
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Albert Meltzer
>
> Currently, {{withColumn}} claims to do the following: _"adding a column or 
> replacing the existing column that has the same name."_
> Unfortunately, if multiple existing columns have the same name (which is a 
> normal occurrence after a join), this results in multiple replaced -- and 
> retained --
>  columns (with the same value), and messages about an ambiguous column.
> The current implementation of {{withColumn}} contains this:
> {noformat}
>   def withColumn(colName: String, col: Column): DataFrame = {
> val resolver = sparkSession.sessionState.analyzer.resolver
> val output = queryExecution.analyzed.output
> val shouldReplace = output.exists(f => resolver(f.name, colName))
> if (shouldReplace) {
>   val columns = output.map { field =>
> if (resolver(field.name, colName)) {
>   col.as(colName)
> } else {
>   Column(field)
> }
>   }
>   select(columns : _*)
> } else {
>   select(Column("*"), col.as(colName))
> }
>   }
> {noformat}
> Instead, suggest something like this (which replaces all matching fields with 
> a single instance of the new one):
> {noformat}
>   def withColumn(colName: String, col: Column): DataFrame = {
> val resolver = sparkSession.sessionState.analyzer.resolver
> val output = queryExecution.analyzed.output
> val existing = output.filterNot(f => resolver(f.name, colName)).map(new 
> Column(_))
> select(existing :+ col.as(colName): _*)
>   }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22283) withColumn should replace multiple instances with a single one

2017-10-17 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207780#comment-16207780
 ] 

Liang-Chi Hsieh commented on SPARK-22283:
-

When joined result has duplicate column name, you can't select any of the 
ambiguous columns by just name. Doesn't {{withColumn}} current behavior simply 
follow it?


> withColumn should replace multiple instances with a single one
> --
>
> Key: SPARK-22283
> URL: https://issues.apache.org/jira/browse/SPARK-22283
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Albert Meltzer
>
> Currently, {{withColumn}} claims to do the following: _"adding a column or 
> replacing the existing column that has the same name."_
> Unfortunately, if multiple existing columns have the same name (which is a 
> normal occurrence after a join), this results in multiple replaced -- and 
> retained --
>  columns (with the same value), and messages about an ambiguous column.
> The current implementation of {{withColumn}} contains this:
> {noformat}
>   def withColumn(colName: String, col: Column): DataFrame = {
> val resolver = sparkSession.sessionState.analyzer.resolver
> val output = queryExecution.analyzed.output
> val shouldReplace = output.exists(f => resolver(f.name, colName))
> if (shouldReplace) {
>   val columns = output.map { field =>
> if (resolver(field.name, colName)) {
>   col.as(colName)
> } else {
>   Column(field)
> }
>   }
>   select(columns : _*)
> } else {
>   select(Column("*"), col.as(colName))
> }
>   }
> {noformat}
> Instead, suggest something like this (which replaces all matching fields with 
> a single instance of the new one):
> {noformat}
>   def withColumn(colName: String, col: Column): DataFrame = {
> val resolver = sparkSession.sessionState.analyzer.resolver
> val output = queryExecution.analyzed.output
> val existing = output.filterNot(f => resolver(f.name, colName)).map(new 
> Column(_))
> select(existing :+ col.as(colName): _*)
>   }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >