[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases

2017-06-27 Thread Barry Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16064972#comment-16064972
 ] 

Barry Becker commented on SPARK-20226:
--

Calling cache() on the dataframe on the after the addColumn used to make this 
run fast. But around the time that we upgraded to spark 2.1.1 it got very slow 
again. Calling cache on the dataframe does not seem to help any more.

If I hardcode the addColumn column expression to be 
{code}
(((CAST(Plate AS STRING) + CAST(State AS STRING)) + CAST(License Type 
AS STRING)) + CAST(Violation Time AS STRING)) + CAST(Violation AS STRING)) + 
CAST(Judgment Entry Date AS STRING)) + CAST(Issue Date AS STRING)) + 
CAST(Summons Number AS STRING)) + CAST(Fine Amount AS STRING)) + CAST(Penalty 
Amount AS STRING)) + CAST(Interest Amount AS STRING)) + CAST(Violation AS 
STRING))
{code}
instead of 
{code}
CAST(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(Plate, State), License Type), 
Violation Time), Violation), UDF(Judgment Entry Date)), UDF(Issue Date)), 
UDF(Summons Number)), UDF(Fine Amount)), UDF(Penalty Amount)), UDF(Interest 
Amount)), Violation) AS STRING)
{code}
which is what is generated by our expression parser, then the time goes from 70 
seconds down to 10 seconds. Still slow, but not nearly as slow.

> Call to sqlContext.cacheTable takes an incredibly long time in some cases
> -
>
> Key: SPARK-20226
> URL: https://issues.apache.org/jira/browse/SPARK-20226
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: linux or windows
>Reporter: Barry Becker
>  Labels: cache
> Attachments: profile_indexer2.PNG, xyzzy.csv
>
>
> I have a case where the call to sqlContext.cacheTable can take an arbitrarily 
> long time depending on the number of columns that are referenced in a 
> withColumn expression applied to a dataframe.
> The dataset is small (20 columns 7861 rows). The sequence to reproduce is the 
> following:
> 1) add a new column that references 8 - 14 of the columns in the dataset. 
>- If I add 8 columns, then the call to cacheTable is fast - like *5 
> seconds*
>- If I add 11 columns, then it is slow - like *60 seconds*
>- and if I add 14 columns, then it basically *takes forever* - I gave up 
> after 10 minutes or so.
>   The Column expression that is added, is basically just concatenating 
> the columns together in a single string. If a number is concatenated on a 
> string (or vice versa) the number is first converted to a string.
>   The expression looks something like this:
> {code}
> `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + 
> `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + 
> `Penalty Amount` + `Interest Amount`
> {code}
> which we then convert to a Column expression that looks like this:
> {code}
> UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), 
> UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), 
> UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), 
> UDF('Interest Amount))
> {code}
>where the UDFs are very simple functions that basically call toString 
> and + as needed.
> 2) apply a pipeline that includes some transformers that was saved earlier. 
> Here are the steps of the pipeline (extracted from parquet)
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License
>  Type_CLEANED__","handleInvalid":"skip","outputCol":"License 
> Type_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing
>  Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code}
>  - 
> 

[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases

2017-04-07 Thread Barry Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960868#comment-15960868
 ] 

Barry Becker commented on SPARK-20226:
--

Only 11 columns. I did not want to wait for 10 or 20 minutes on each run, so I 
only used 11. If I went to 14 it would take over 10 minutes (or longer). I 
guess I could try it again with 14 columns and see how much it helps. Maybe in 
that case it would make a bigger difference, but even waiting a minute for such 
a small dataset seems too long.

> Call to sqlContext.cacheTable takes an incredibly long time in some cases
> -
>
> Key: SPARK-20226
> URL: https://issues.apache.org/jira/browse/SPARK-20226
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: linux or windows
>Reporter: Barry Becker
>  Labels: cache
> Attachments: profile_indexer2.PNG, xyzzy.csv
>
>
> I have a case where the call to sqlContext.cacheTable can take an arbitrarily 
> long time depending on the number of columns that are referenced in a 
> withColumn expression applied to a dataframe.
> The dataset is small (20 columns 7861 rows). The sequence to reproduce is the 
> following:
> 1) add a new column that references 8 - 14 of the columns in the dataset. 
>- If I add 8 columns, then the call to cacheTable is fast - like *5 
> seconds*
>- If I add 11 columns, then it is slow - like *60 seconds*
>- and if I add 14 columns, then it basically *takes forever* - I gave up 
> after 10 minutes or so.
>   The Column expression that is added, is basically just concatenating 
> the columns together in a single string. If a number is concatenated on a 
> string (or vice versa) the number is first converted to a string.
>   The expression looks something like this:
> {code}
> `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + 
> `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + 
> `Penalty Amount` + `Interest Amount`
> {code}
> which we then convert to a Column expression that looks like this:
> {code}
> UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), 
> UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), 
> UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), 
> UDF('Interest Amount))
> {code}
>where the UDFs are very simple functions that basically call toString 
> and + as needed.
> 2) apply a pipeline that includes some transformers that was saved earlier. 
> Here are the steps of the pipeline (extracted from parquet)
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License
>  Type_CLEANED__","handleInvalid":"skip","outputCol":"License 
> Type_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing
>  Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation
>  Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0",
> "uid":"bucketizer_6f65ca9fa813",
>   "paramMap":{
> "outputCol":"Summons 
> Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons
>  Number_CLEANED__"
>}
>}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0",
> "uid":"bucketizer_f5db4fb8120e",
> "paramMap":{
>  
> 

[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases

2017-04-07 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960835#comment-15960835
 ] 

Liang-Chi Hsieh commented on SPARK-20226:
-

How many columns are added in above runs? I didn't see the long running time (> 
10mins at least) as you reported in the jira description.

For big query plans, constraint propagation will hit combination explosion 
issue and block the driver for long. So we have this flag 
"spark.sql.constraintPropagation.enabled" to disable it.

For relatively small query plans (I suppose the above runs are because of the 
shorter running time), this flag doesn't make significant difference.

Every time when you cache the table after adding a column, it finishes planning 
the query plan, so you will not hit the issue of constraint propagation.

> Call to sqlContext.cacheTable takes an incredibly long time in some cases
> -
>
> Key: SPARK-20226
> URL: https://issues.apache.org/jira/browse/SPARK-20226
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: linux or windows
>Reporter: Barry Becker
>  Labels: cache
> Attachments: profile_indexer2.PNG, xyzzy.csv
>
>
> I have a case where the call to sqlContext.cacheTable can take an arbitrarily 
> long time depending on the number of columns that are referenced in a 
> withColumn expression applied to a dataframe.
> The dataset is small (20 columns 7861 rows). The sequence to reproduce is the 
> following:
> 1) add a new column that references 8 - 14 of the columns in the dataset. 
>- If I add 8 columns, then the call to cacheTable is fast - like *5 
> seconds*
>- If I add 11 columns, then it is slow - like *60 seconds*
>- and if I add 14 columns, then it basically *takes forever* - I gave up 
> after 10 minutes or so.
>   The Column expression that is added, is basically just concatenating 
> the columns together in a single string. If a number is concatenated on a 
> string (or vice versa) the number is first converted to a string.
>   The expression looks something like this:
> {code}
> `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + 
> `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + 
> `Penalty Amount` + `Interest Amount`
> {code}
> which we then convert to a Column expression that looks like this:
> {code}
> UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), 
> UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), 
> UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), 
> UDF('Interest Amount))
> {code}
>where the UDFs are very simple functions that basically call toString 
> and + as needed.
> 2) apply a pipeline that includes some transformers that was saved earlier. 
> Here are the steps of the pipeline (extracted from parquet)
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License
>  Type_CLEANED__","handleInvalid":"skip","outputCol":"License 
> Type_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing
>  Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation
>  Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0",
> "uid":"bucketizer_6f65ca9fa813",
>   "paramMap":{
> "outputCol":"Summons 
> 

[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases

2017-04-07 Thread Barry Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960806#comment-15960806
 ] 

Barry Becker commented on SPARK-20226:
--

OK, I set the flag using 
sqlContext.setConf("spark.sql.constraintPropagation.enabled", "false") to be 
sure its set.
Here are my results. I averaged a few runs in each instance to try and get a 
more accurate reading. The variance was pretty high.
{code}
cacheTable applyPipeline total   cache after add column  
"spark.sql.constraintPropagation.enabled"
--   --  --  --
71s  8.5s1m 30s   no caching "true"
53s  8.5s1m 10s   no caching "false"
65s  8.5s1m 23s   no caching nothing set (true is 
default I assume)
 1s  8.421s   cachingnothing set  
{code}  
As you can see, it gets a little better with constraintPropagation off, but not 
nearly as good as caching the dataframe before applying the pipeline. Why is 
caching before the pipeline such a big win, if the pipeline stages are applied 
linearly?

> Call to sqlContext.cacheTable takes an incredibly long time in some cases
> -
>
> Key: SPARK-20226
> URL: https://issues.apache.org/jira/browse/SPARK-20226
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: linux or windows
>Reporter: Barry Becker
>  Labels: cache
> Attachments: profile_indexer2.PNG, xyzzy.csv
>
>
> I have a case where the call to sqlContext.cacheTable can take an arbitrarily 
> long time depending on the number of columns that are referenced in a 
> withColumn expression applied to a dataframe.
> The dataset is small (20 columns 7861 rows). The sequence to reproduce is the 
> following:
> 1) add a new column that references 8 - 14 of the columns in the dataset. 
>- If I add 8 columns, then the call to cacheTable is fast - like *5 
> seconds*
>- If I add 11 columns, then it is slow - like *60 seconds*
>- and if I add 14 columns, then it basically *takes forever* - I gave up 
> after 10 minutes or so.
>   The Column expression that is added, is basically just concatenating 
> the columns together in a single string. If a number is concatenated on a 
> string (or vice versa) the number is first converted to a string.
>   The expression looks something like this:
> {code}
> `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + 
> `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + 
> `Penalty Amount` + `Interest Amount`
> {code}
> which we then convert to a Column expression that looks like this:
> {code}
> UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), 
> UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), 
> UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), 
> UDF('Interest Amount))
> {code}
>where the UDFs are very simple functions that basically call toString 
> and + as needed.
> 2) apply a pipeline that includes some transformers that was saved earlier. 
> Here are the steps of the pipeline (extracted from parquet)
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License
>  Type_CLEANED__","handleInvalid":"skip","outputCol":"License 
> Type_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing
>  Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation
>  

[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases

2017-04-06 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960001#comment-15960001
 ] 

Liang-Chi Hsieh commented on SPARK-20226:
-

{{spark.sql.constraintPropagation.enabled}} is a SQL config flag. I am not sure 
if your local.conf only covers Spark configuration via SparkConf. Can you 
explicitly set this flag in your application through SQLConf?

> Call to sqlContext.cacheTable takes an incredibly long time in some cases
> -
>
> Key: SPARK-20226
> URL: https://issues.apache.org/jira/browse/SPARK-20226
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: linux or windows
>Reporter: Barry Becker
>  Labels: cache
> Attachments: profile_indexer2.PNG, xyzzy.csv
>
>
> I have a case where the call to sqlContext.cacheTable can take an arbitrarily 
> long time depending on the number of columns that are referenced in a 
> withColumn expression applied to a dataframe.
> The dataset is small (20 columns 7861 rows). The sequence to reproduce is the 
> following:
> 1) add a new column that references 8 - 14 of the columns in the dataset. 
>- If I add 8 columns, then the call to cacheTable is fast - like *5 
> seconds*
>- If I add 11 columns, then it is slow - like *60 seconds*
>- and if I add 14 columns, then it basically *takes forever* - I gave up 
> after 10 minutes or so.
>   The Column expression that is added, is basically just concatenating 
> the columns together in a single string. If a number is concatenated on a 
> string (or vice versa) the number is first converted to a string.
>   The expression looks something like this:
> {code}
> `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + 
> `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + 
> `Penalty Amount` + `Interest Amount`
> {code}
> which we then convert to a Column expression that looks like this:
> {code}
> UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), 
> UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), 
> UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), 
> UDF('Interest Amount))
> {code}
>where the UDFs are very simple functions that basically call toString 
> and + as needed.
> 2) apply a pipeline that includes some transformers that was saved earlier. 
> Here are the steps of the pipeline (extracted from parquet)
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License
>  Type_CLEANED__","handleInvalid":"skip","outputCol":"License 
> Type_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing
>  Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation
>  Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0",
> "uid":"bucketizer_6f65ca9fa813",
>   "paramMap":{
> "outputCol":"Summons 
> Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons
>  Number_CLEANED__"
>}
>}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0",
> "uid":"bucketizer_f5db4fb8120e",
> "paramMap":{
>  
> 

[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases

2017-04-06 Thread Barry Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959134#comment-15959134
 ] 

Barry Becker commented on SPARK-20226:
--

Yes. We are running through spark job-server, and local.conf is where all the 
spark options go (in a section called context-settings {...}).

> Call to sqlContext.cacheTable takes an incredibly long time in some cases
> -
>
> Key: SPARK-20226
> URL: https://issues.apache.org/jira/browse/SPARK-20226
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: linux or windows
>Reporter: Barry Becker
>  Labels: cache
> Attachments: profile_indexer2.PNG, xyzzy.csv
>
>
> I have a case where the call to sqlContext.cacheTable can take an arbitrarily 
> long time depending on the number of columns that are referenced in a 
> withColumn expression applied to a dataframe.
> The dataset is small (20 columns 7861 rows). The sequence to reproduce is the 
> following:
> 1) add a new column that references 8 - 14 of the columns in the dataset. 
>- If I add 8 columns, then the call to cacheTable is fast - like *5 
> seconds*
>- If I add 11 columns, then it is slow - like *60 seconds*
>- and if I add 14 columns, then it basically *takes forever* - I gave up 
> after 10 minutes or so.
>   The Column expression that is added, is basically just concatenating 
> the columns together in a single string. If a number is concatenated on a 
> string (or vice versa) the number is first converted to a string.
>   The expression looks something like this:
> {code}
> `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + 
> `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + 
> `Penalty Amount` + `Interest Amount`
> {code}
> which we then convert to a Column expression that looks like this:
> {code}
> UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), 
> UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), 
> UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), 
> UDF('Interest Amount))
> {code}
>where the UDFs are very simple functions that basically call toString 
> and + as needed.
> 2) apply a pipeline that includes some transformers that was saved earlier. 
> Here are the steps of the pipeline (extracted from parquet)
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License
>  Type_CLEANED__","handleInvalid":"skip","outputCol":"License 
> Type_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing
>  Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation
>  Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0",
> "uid":"bucketizer_6f65ca9fa813",
>   "paramMap":{
> "outputCol":"Summons 
> Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons
>  Number_CLEANED__"
>}
>}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0",
> "uid":"bucketizer_f5db4fb8120e",
> "paramMap":{
>  
> "splits":["-Inf",1.435215616E9,1.443855616E9,1.447271936E9,1.448222464E9,1.448395264E9,1.448481536E9,1.448827136E9,1.449259264E9,1.449432064E9,1.449518336E9,"Inf"],
>   "handleInvalid":"keep","outputCol":"Issue 
> 

[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases

2017-04-06 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959072#comment-15959072
 ] 

Liang-Chi Hsieh commented on SPARK-20226:
-

I am not sure what the job-server local.conf is. Does it really affect Spark 
SQL conf?

> Call to sqlContext.cacheTable takes an incredibly long time in some cases
> -
>
> Key: SPARK-20226
> URL: https://issues.apache.org/jira/browse/SPARK-20226
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: linux or windows
>Reporter: Barry Becker
>  Labels: cache
> Attachments: profile_indexer2.PNG, xyzzy.csv
>
>
> I have a case where the call to sqlContext.cacheTable can take an arbitrarily 
> long time depending on the number of columns that are referenced in a 
> withColumn expression applied to a dataframe.
> The dataset is small (20 columns 7861 rows). The sequence to reproduce is the 
> following:
> 1) add a new column that references 8 - 14 of the columns in the dataset. 
>- If I add 8 columns, then the call to cacheTable is fast - like *5 
> seconds*
>- If I add 11 columns, then it is slow - like *60 seconds*
>- and if I add 14 columns, then it basically *takes forever* - I gave up 
> after 10 minutes or so.
>   The Column expression that is added, is basically just concatenating 
> the columns together in a single string. If a number is concatenated on a 
> string (or vice versa) the number is first converted to a string.
>   The expression looks something like this:
> {code}
> `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + 
> `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + 
> `Penalty Amount` + `Interest Amount`
> {code}
> which we then convert to a Column expression that looks like this:
> {code}
> UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), 
> UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), 
> UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), 
> UDF('Interest Amount))
> {code}
>where the UDFs are very simple functions that basically call toString 
> and + as needed.
> 2) apply a pipeline that includes some transformers that was saved earlier. 
> Here are the steps of the pipeline (extracted from parquet)
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License
>  Type_CLEANED__","handleInvalid":"skip","outputCol":"License 
> Type_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing
>  Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation
>  Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0",
> "uid":"bucketizer_6f65ca9fa813",
>   "paramMap":{
> "outputCol":"Summons 
> Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons
>  Number_CLEANED__"
>}
>}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0",
> "uid":"bucketizer_f5db4fb8120e",
> "paramMap":{
>  
> "splits":["-Inf",1.435215616E9,1.443855616E9,1.447271936E9,1.448222464E9,1.448395264E9,1.448481536E9,1.448827136E9,1.449259264E9,1.449432064E9,1.449518336E9,"Inf"],
>   "handleInvalid":"keep","outputCol":"Issue 
> Date_BINNED__","inputCol":"Issue Date_CLEANED__"
> 

[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases

2017-04-06 Thread Barry Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959024#comment-15959024
 ] 

Barry Becker commented on SPARK-20226:
--

I set spark.sql.constraintPropagation.enabled to false in job-server local.conf 
and tried again.
It did not help. It still took about 2 minutes. Oddly, setting it to true 
seemed to make it worse.
I did find something that did work though. If I simply call cache() on the 
dataframe after the add column (right after step 1 above)
then it runs very quickly. The time spent in cacheTable goes from 60 seconds to 
0.5 seconds. I don't understand why though.
I thought calling cache would only help of there was branching, but the 
pipeline is linear isn't it?

Here is what the query plan looks like in the call to cache the dataframe 
before transforming with the pipeline.
{code}
Project [Plate#6, State#7, License Type#8, Summons Number#9, Issue Date#10, 
Violation Time#11, Violation#12, Judgment Entry Date#13, Fine Amount#14, 
Penalty Amount#15, Interest Amount#16, Reduction Amount#17, Payment Amount#18, 
Amount Due#19, Precinct#20, County#21, Issuing Agency#22, Violation Status#23, 
cast(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(Plate#6, State#7), License 
Type#8), Violation Time#11), Violation#12), UDF(Judgment Entry Date#13)), 
UDF(Issue Date#10)), UDF(Summons Number#9)), UDF(Fine Amount#14)), UDF(Penalty 
Amount#15)), UDF(Interest Amount#16)) as string) AS columnBasedOnManyCols#43]
+- Relation[Plate#6,State#7,License Type#8,Summons Number#9,Issue 
Date#10,Violation Time#11,Violation#12,Judgment Entry Date#13,Fine 
Amount#14,Penalty Amount#15,Interest Amount#16,Reduction Amount#17,Payment 
Amount#18,Amount Due#19,Precinct#20,County#21,Issuing Agency#22,Violation 
Status#23] csv
{code}
Here is how the query plan now looks in the call to cacheTable after 
transforming with the pipeline. Looks fairly similar to what it was before, but 
now its fast.
{code}
SubqueryAlias foo123, `foo123`
+- Project [Plate#236, State#237, License Type#238, Summons Number#239, Issue 
Date#240, Violation Time#241, Violation#242, Judgment Entry Date#243, Fine 
Amount#244, Penalty Amount#245, Interest Amount#246, Reduction Amount#247, 
Payment Amount#248, Amount Due#249, Precinct#250, County#251, Issuing 
Agency#252, Violation Status#253, columnBasedOnManyCols#254, Penalty Amount 
(predicted)#2476]
   +- Project [Plate#236, Plate_CLEANED__#275, State#237, State_CLEANED__#276, 
License Type#238, License Type_CLEANED__#277, Summons Number_CLEANED__#362, 
Summons Number#239, Issue Date#240, Issue Date_CLEANED__#323, Violation 
Time#241, Violation Time_CLEANED__#279, Violation#242, Violation_CLEANED__#280, 
Judgment Entry Date#243, Judgment Entry Date_CLEANED__#324, Fine Amount#244, 
Fine Amount_CLEANED__#325, Penalty Amount#245, Penalty Amount_CLEANED__#326, 
Interest Amount_CLEANED__#363, Interest Amount#246, Reduction 
Amount_CLEANED__#364, Reduction Amount#247, ... 33 more fields]
  +- Project [Plate#236, Plate_CLEANED__#275, State#237, 
State_CLEANED__#276, License Type#238, License Type_CLEANED__#277, Summons 
Number_CLEANED__#362, Summons Number#239, Issue Date#240, Issue 
Date_CLEANED__#323, Violation Time#241, Violation Time_CLEANED__#279, 
Violation#242, Violation_CLEANED__#280, Judgment Entry Date#243, Judgment Entry 
Date_CLEANED__#324, Fine Amount#244, Fine Amount_CLEANED__#325, Penalty 
Amount#245, Penalty Amount_CLEANED__#326, Interest Amount_CLEANED__#363, 
Interest Amount#246, Reduction Amount_CLEANED__#364, Reduction Amount#247, ... 
33 more fields]
 +- SubqueryAlias sql_1ea4c1b5c52e_cd062499a688, 
`sql_1ea4c1b5c52e_cd062499a688`
+- Project [Plate#236, Plate_CLEANED__#275, State#237, 
State_CLEANED__#276, License Type#238, License Type_CLEANED__#277, Summons 
Number_CLEANED__#362, Summons Number#239, Issue Date#240, Issue 
Date_CLEANED__#323, Violation Time#241, Violation Time_CLEANED__#279, 
Violation#242, Violation_CLEANED__#280, Judgment Entry Date#243, Judgment Entry 
Date_CLEANED__#324, Fine Amount#244, Fine Amount_CLEANED__#325, Penalty 
Amount#245, Penalty Amount_CLEANED__#326, Interest Amount_CLEANED__#363, 
Interest Amount#246, Reduction Amount_CLEANED__#364, Reduction Amount#247, ... 
32 more fields]
   +- Project [Plate#236, Plate_CLEANED__#275, State#237, 
State_CLEANED__#276, License Type#238, License Type_CLEANED__#277, Summons 
Number_CLEANED__#362, Summons Number#239, Issue Date#240, Issue 
Date_CLEANED__#323, Violation Time#241, Violation Time_CLEANED__#279, 
Violation#242, Violation_CLEANED__#280, Judgment Entry Date#243, Judgment Entry 
Date_CLEANED__#324, Fine Amount#244, Fine Amount_CLEANED__#325, Penalty 
Amount#245, Penalty Amount_CLEANED__#326, Interest Amount_CLEANED__#363, 
Interest Amount#246, Reduction Amount_CLEANED__#364, Reduction Amount#247, ... 
31 more fields]
  +- Project [Plate#236, Plate_CLEANED__#275, 

[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases

2017-04-06 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958512#comment-15958512
 ] 

Sean Owen commented on SPARK-20226:
---

Yeah it does looks like it's slowing down not purely because of the 
StringIndexerModel per se but something in the planner.

> Call to sqlContext.cacheTable takes an incredibly long time in some cases
> -
>
> Key: SPARK-20226
> URL: https://issues.apache.org/jira/browse/SPARK-20226
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: linux or windows
>Reporter: Barry Becker
>  Labels: cache
> Attachments: profile_indexer2.PNG, xyzzy.csv
>
>
> I have a case where the call to sqlContext.cacheTable can take an arbitrarily 
> long time depending on the number of columns that are referenced in a 
> withColumn expression applied to a dataframe.
> The dataset is small (20 columns 7861 rows). The sequence to reproduce is the 
> following:
> 1) add a new column that references 8 - 14 of the columns in the dataset. 
>- If I add 8 columns, then the call to cacheTable is fast - like *5 
> seconds*
>- If I add 11 columns, then it is slow - like *60 seconds*
>- and if I add 14 columns, then it basically *takes forever* - I gave up 
> after 10 minutes or so.
>   The Column expression that is added, is basically just concatenating 
> the columns together in a single string. If a number is concatenated on a 
> string (or vice versa) the number is first converted to a string.
>   The expression looks something like this:
> {code}
> `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + 
> `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + 
> `Penalty Amount` + `Interest Amount`
> {code}
> which we then convert to a Column expression that looks like this:
> {code}
> UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), 
> UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), 
> UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), 
> UDF('Interest Amount))
> {code}
>where the UDFs are very simple functions that basically call toString 
> and + as needed.
> 2) apply a pipeline that includes some transformers that was saved earlier. 
> Here are the steps of the pipeline (extracted from parquet)
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License
>  Type_CLEANED__","handleInvalid":"skip","outputCol":"License 
> Type_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing
>  Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation
>  Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0",
> "uid":"bucketizer_6f65ca9fa813",
>   "paramMap":{
> "outputCol":"Summons 
> Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons
>  Number_CLEANED__"
>}
>}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0",
> "uid":"bucketizer_f5db4fb8120e",
> "paramMap":{
>  
> "splits":["-Inf",1.435215616E9,1.443855616E9,1.447271936E9,1.448222464E9,1.448395264E9,1.448481536E9,1.448827136E9,1.449259264E9,1.449432064E9,1.449518336E9,"Inf"],
>   "handleInvalid":"keep","outputCol":"Issue 
> 

[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases

2017-04-05 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958259#comment-15958259
 ] 

Liang-Chi Hsieh commented on SPARK-20226:
-

[~barrybecker4] Can you try to disable this config flag 
{{spark.sql.constraintPropagation.enabled}} and try it again?

Thanks.

> Call to sqlContext.cacheTable takes an incredibly long time in some cases
> -
>
> Key: SPARK-20226
> URL: https://issues.apache.org/jira/browse/SPARK-20226
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: linux or windows
>Reporter: Barry Becker
>  Labels: cache
> Attachments: profile_indexer2.PNG, xyzzy.csv
>
>
> I have a case where the call to sqlContext.cacheTable can take an arbitrarily 
> long time depending on the number of columns that are referenced in a 
> withColumn expression applied to a dataframe.
> The dataset is small (20 columns 7861 rows). The sequence to reproduce is the 
> following:
> 1) add a new column that references 8 - 14 of the columns in the dataset. 
>- If I add 8 columns, then the call to cacheTable is fast - like *5 
> seconds*
>- If I add 11 columns, then it is slow - like *60 seconds*
>- and if I add 14 columns, then it basically *takes forever* - I gave up 
> after 10 minutes or so.
>   The Column expression that is added, is basically just concatenating 
> the columns together in a single string. If a number is concatenated on a 
> string (or vice versa) the number is first converted to a string.
>   The expression looks something like this:
> {code}
> `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + 
> `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + 
> `Penalty Amount` + `Interest Amount`
> {code}
> which we then convert to a Column expression that looks like this:
> {code}
> UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), 
> UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), 
> UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), 
> UDF('Interest Amount))
> {code}
>where the UDFs are very simple functions that basically call toString 
> and + as needed.
> 2) apply a pipeline that includes some transformers that was saved earlier. 
> Here are the steps of the pipeline (extracted from parquet)
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License
>  Type_CLEANED__","handleInvalid":"skip","outputCol":"License 
> Type_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing
>  Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation
>  Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0",
> "uid":"bucketizer_6f65ca9fa813",
>   "paramMap":{
> "outputCol":"Summons 
> Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons
>  Number_CLEANED__"
>}
>}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0",
> "uid":"bucketizer_f5db4fb8120e",
> "paramMap":{
>  
> "splits":["-Inf",1.435215616E9,1.443855616E9,1.447271936E9,1.448222464E9,1.448395264E9,1.448481536E9,1.448827136E9,1.449259264E9,1.449432064E9,1.449518336E9,"Inf"],
>   "handleInvalid":"keep","outputCol":"Issue 
> 

[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases

2017-04-05 Thread Barry Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15957732#comment-15957732
 ] 

Barry Becker commented on SPARK-20226:
--

I did some profiling using the sampler in JVisualVM and took some threaddumps 
at different points while it was running.
The main hotspot indicated by the sampler is 
scala.collection.GenSeqLike$class.equals at the lowest level.
The threaddumps show that that is what is being called at the bottom of the 
stack.
Below are 2 representative thread dumps that I get while its processing:
{code}
"main-ScalaTest-running-ApplyModelSuite@1" prio=5 tid=0x1 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
  at scala.collection.GenSeqLike$class.equals(GenSeqLike.scala:474)
  at scala.collection.AbstractSeq.equals(Seq.scala:41)
  at 
org.apache.spark.sql.catalyst.expressions.ScalaUDF.equals(ScalaUDF.scala:39)
  at 
scala.collection.IndexedSeqOptimized$class.sameElements(IndexedSeqOptimized.scala:167)
  at 
scala.collection.mutable.ArrayBuffer.sameElements(ArrayBuffer.scala:48)
  at scala.collection.GenSeqLike$class.equals(GenSeqLike.scala:474)
  at scala.collection.AbstractSeq.equals(Seq.scala:41)
  at 
org.apache.spark.sql.catalyst.expressions.ScalaUDF.equals(ScalaUDF.scala:39)
  at 
scala.collection.IndexedSeqOptimized$class.sameElements(IndexedSeqOptimized.scala:167)
  at 
scala.collection.mutable.ArrayBuffer.sameElements(ArrayBuffer.scala:48)
  at scala.collection.GenSeqLike$class.equals(GenSeqLike.scala:474)
  at scala.collection.AbstractSeq.equals(Seq.scala:41)
  at 
org.apache.spark.sql.catalyst.expressions.ScalaUDF.equals(ScalaUDF.scala:39)
  at 
scala.collection.IndexedSeqOptimized$class.sameElements(IndexedSeqOptimized.scala:167)
  at 
scala.collection.mutable.ArrayBuffer.sameElements(ArrayBuffer.scala:48)
  at scala.collection.GenSeqLike$class.equals(GenSeqLike.scala:474)
  at scala.collection.AbstractSeq.equals(Seq.scala:41)
  at 
org.apache.spark.sql.catalyst.expressions.ScalaUDF.equals(ScalaUDF.scala:39)
  at 
org.apache.spark.sql.catalyst.expressions.Cast.equals(Cast.scala:123)
  at 
org.apache.spark.sql.catalyst.expressions.EqualNullSafe.equals(predicates.scala:446)
  at 
scala.collection.mutable.FlatHashTable$class.addEntry(FlatHashTable.scala:151)
  at scala.collection.mutable.HashSet.addEntry(HashSet.scala:40)
  at 
scala.collection.mutable.FlatHashTable$class.growTable(FlatHashTable.scala:225)
  at 
scala.collection.mutable.FlatHashTable$class.addEntry(FlatHashTable.scala:159)
  at scala.collection.mutable.HashSet.addEntry(HashSet.scala:40)
  at 
scala.collection.mutable.FlatHashTable$class.addElem(FlatHashTable.scala:139)
  at scala.collection.mutable.HashSet.addElem(HashSet.scala:40)
  at scala.collection.mutable.HashSet.$plus$eq(HashSet.scala:59)
  at scala.collection.mutable.HashSet.$plus$eq(HashSet.scala:40)
  at 
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
  at 
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
  at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
  at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  at scala.collection.mutable.AbstractSet.$plus$plus$eq(Set.scala:46)
  at scala.collection.mutable.HashSet.clone(HashSet.scala:83)
  at scala.collection.mutable.HashSet.clone(HashSet.scala:40)
  at 
org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus(ExpressionSet.scala:65)
  at 
org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus(ExpressionSet.scala:50)
  at 
scala.collection.SetLike$$anonfun$$plus$plus$1.apply(SetLike.scala:141)
  at 
scala.collection.SetLike$$anonfun$$plus$plus$1.apply(SetLike.scala:141)
  at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:142)
  at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:142)
  at 
scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322)
  at 
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:978)
  at 
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:978)
  at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:142)
  at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
  at 
scala.collection.TraversableOnce$class.$div$colon(TraversableOnce.scala:136)
  at 
scala.collection.AbstractTraversable.$div$colon(Traversable.scala:104)
  at scala.collection.SetLike$class.$plus$plus(SetLike.scala:141)
  at 

[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases

2017-04-05 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15957496#comment-15957496
 ] 

Sean Owen commented on SPARK-20226:
---

It could just look that way because caching means evaluating, and evaluating 
takes the time. It seems like you've shown it's all due to time in the 
StringIndexerModels, no? I think you'd be best off finding out where it's 
spending time. Calling {{kill -QUIT (pid)}} a couple times on an executor is a 
good poor man's way to see where the threads are spending time. (Real profiling 
isn't hard either if you have something like JProfiler)

> Call to sqlContext.cacheTable takes an incredibly long time in some cases
> -
>
> Key: SPARK-20226
> URL: https://issues.apache.org/jira/browse/SPARK-20226
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: linux or windows
>Reporter: Barry Becker
>  Labels: cache
> Attachments: xyzzy.csv
>
>
> I have a case where the call to sqlContext.cacheTable can take an arbitrarily 
> long time depending on the number of columns that are referenced in a 
> withColumn expression applied to a dataframe.
> The dataset is small (20 columns 7861 rows). The sequence to reproduce is the 
> following:
> 1) add a new column that references 8 - 14 of the columns in the dataset. 
>- If I add 8 columns, then the call to cacheTable is fast - like *5 
> seconds*
>- If I add 11 columns, then it is slow - like *60 seconds*
>- and if I add 14 columns, then it basically *takes forever* - I gave up 
> after 10 minutes or so.
>   The Column expression that is added, is basically just concatenating 
> the columns together in a single string. If a number is concatenated on a 
> string (or vice versa) the number is first converted to a string.
>   The expression looks something like this:
> {code}
> `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + 
> `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + 
> `Penalty Amount` + `Interest Amount`
> {code}
> which we then convert to a Column expression that looks like this:
> {code}
> UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), 
> UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), 
> UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), 
> UDF('Interest Amount))
> {code}
>where the UDFs are very simple functions that basically call toString 
> and + as needed.
> 2) apply a pipeline that includes some transformers that was saved earlier. 
> Here are the steps of the pipeline (extracted from parquet)
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License
>  Type_CLEANED__","handleInvalid":"skip","outputCol":"License 
> Type_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing
>  Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation
>  Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0",
> "uid":"bucketizer_6f65ca9fa813",
>   "paramMap":{
> "outputCol":"Summons 
> Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons
>  Number_CLEANED__"
>}
>}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0",
> 

[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases

2017-04-05 Thread Barry Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15957489#comment-15957489
 ] 

Barry Becker commented on SPARK-20226:
--

I thought the problem was in the cacheTable call because that is where all the 
time was spent. Do you think the problem is with the generated query plan or 
something else? Any hints or tips you could provide would be appreciated.

> Call to sqlContext.cacheTable takes an incredibly long time in some cases
> -
>
> Key: SPARK-20226
> URL: https://issues.apache.org/jira/browse/SPARK-20226
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: linux or windows
>Reporter: Barry Becker
>  Labels: cache
> Attachments: xyzzy.csv
>
>
> I have a case where the call to sqlContext.cacheTable can take an arbitrarily 
> long time depending on the number of columns that are referenced in a 
> withColumn expression applied to a dataframe.
> The dataset is small (20 columns 7861 rows). The sequence to reproduce is the 
> following:
> 1) add a new column that references 8 - 14 of the columns in the dataset. 
>- If I add 8 columns, then the call to cacheTable is fast - like *5 
> seconds*
>- If I add 11 columns, then it is slow - like *60 seconds*
>- and if I add 14 columns, then it basically *takes forever* - I gave up 
> after 10 minutes or so.
>   The Column expression that is added, is basically just concatenating 
> the columns together in a single string. If a number is concatenated on a 
> string (or vice versa) the number is first converted to a string.
>   The expression looks something like this:
> {code}
> `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + 
> `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + 
> `Penalty Amount` + `Interest Amount`
> {code}
> which we then convert to a Column expression that looks like this:
> {code}
> UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), 
> UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), 
> UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), 
> UDF('Interest Amount))
> {code}
>where the UDFs are very simple functions that basically call toString 
> and + as needed.
> 2) apply a pipeline that includes some transformers that was saved earlier. 
> Here are the steps of the pipeline (extracted from parquet)
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License
>  Type_CLEANED__","handleInvalid":"skip","outputCol":"License 
> Type_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing
>  Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation
>  Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0",
> "uid":"bucketizer_6f65ca9fa813",
>   "paramMap":{
> "outputCol":"Summons 
> Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons
>  Number_CLEANED__"
>}
>}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0",
> "uid":"bucketizer_f5db4fb8120e",
> "paramMap":{
>  
> 

[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases

2017-04-05 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15957468#comment-15957468
 ] 

Sean Owen commented on SPARK-20226:
---

OK, this doesn't sound like it's anything to do with SQLContext or caching. 

> Call to sqlContext.cacheTable takes an incredibly long time in some cases
> -
>
> Key: SPARK-20226
> URL: https://issues.apache.org/jira/browse/SPARK-20226
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: linux or windows
>Reporter: Barry Becker
>  Labels: cache
> Attachments: xyzzy.csv
>
>
> I have a case where the call to sqlContext.cacheTable can take an arbitrarily 
> long time depending on the number of columns that are referenced in a 
> withColumn expression applied to a dataframe.
> The dataset is small (20 columns 7861 rows). The sequence to reproduce is the 
> following:
> 1) add a new column that references 8 - 14 of the columns in the dataset. 
>- If I add 8 columns, then the call to cacheTable is fast - like *5 
> seconds*
>- If I add 11 columns, then it is slow - like *60 seconds*
>- and if I add 14 columns, then it basically *takes forever* - I gave up 
> after 10 minutes or so.
>   The Column expression that is added, is basically just concatenating 
> the columns together in a single string. If a number is concatenated on a 
> string (or vice versa) the number is first converted to a string.
>   The expression looks something like this:
> {code}
> `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + 
> `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + 
> `Penalty Amount` + `Interest Amount`
> {code}
> which we then convert to a Column expression that looks like this:
> {code}
> UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), 
> UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), 
> UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), 
> UDF('Interest Amount))
> {code}
>where the UDFs are very simple functions that basically call toString 
> and + as needed.
> 2) apply a pipeline that includes some transformers that was saved earlier. 
> Here are the steps of the pipeline (extracted from parquet)
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License
>  Type_CLEANED__","handleInvalid":"skip","outputCol":"License 
> Type_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing
>  Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation
>  Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0",
> "uid":"bucketizer_6f65ca9fa813",
>   "paramMap":{
> "outputCol":"Summons 
> Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons
>  Number_CLEANED__"
>}
>}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0",
> "uid":"bucketizer_f5db4fb8120e",
> "paramMap":{
>  
> "splits":["-Inf",1.435215616E9,1.443855616E9,1.447271936E9,1.448222464E9,1.448395264E9,1.448481536E9,1.448827136E9,1.449259264E9,1.449432064E9,1.449518336E9,"Inf"],
>   "handleInvalid":"keep","outputCol":"Issue 
> Date_BINNED__","inputCol":"Issue Date_CLEANED__"
>}
>}{code}
>  - 
> 

[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases

2017-04-05 Thread Barry Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15957457#comment-15957457
 ] 

Barry Becker commented on SPARK-20226:
--

It seems like it has to do with the interaction between the StringIndexerModels 
in the pipeline and the column that was added in the first step. None of my 
StringIndexers are applied to columns with more than 100 values.

Removing the bucketizers from the pipeline had much less of an impact than 
removing the StringIndexerModels. There were 6 StringIndexerModels and 8 
Bucketizers in the original pipeline that took 80s to cache. When I remove all 
the StringIndexers from the pipeline the time goes way down to 0.3s. When I 
remove all the Bucketizers the time only goes down to only 66s when calling 
cacheTable.


> Call to sqlContext.cacheTable takes an incredibly long time in some cases
> -
>
> Key: SPARK-20226
> URL: https://issues.apache.org/jira/browse/SPARK-20226
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: linux or windows
>Reporter: Barry Becker
>  Labels: cache
> Attachments: xyzzy.csv
>
>
> I have a case where the call to sqlContext.cacheTable can take an arbitrarily 
> long time depending on the number of columns that are referenced in a 
> withColumn expression applied to a dataframe.
> The dataset is small (20 columns 7861 rows). The sequence to reproduce is the 
> following:
> 1) add a new column that references 8 - 14 of the columns in the dataset. 
>- If I add 8 columns, then the call to cacheTable is fast - like *5 
> seconds*
>- If I add 11 columns, then it is slow - like *60 seconds*
>- and if I add 14 columns, then it basically *takes forever* - I gave up 
> after 10 minutes or so.
>   The Column expression that is added, is basically just concatenating 
> the columns together in a single string. If a number is concatenated on a 
> string (or vice versa) the number is first converted to a string.
>   The expression looks something like this:
> {code}
> `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + 
> `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + 
> `Penalty Amount` + `Interest Amount`
> {code}
> which we then convert to a Column expression that looks like this:
> {code}
> UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), 
> UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), 
> UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), 
> UDF('Interest Amount))
> {code}
>where the UDFs are very simple functions that basically call toString 
> and + as needed.
> 2) apply a pipeline that includes some transformers that was saved earlier. 
> Here are the steps of the pipeline (extracted from parquet)
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License
>  Type_CLEANED__","handleInvalid":"skip","outputCol":"License 
> Type_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing
>  Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation
>  Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0",
> "uid":"bucketizer_6f65ca9fa813",
>   "paramMap":{
> "outputCol":"Summons 
> Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons
>  

[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases

2017-04-05 Thread Barry Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15957296#comment-15957296
 ] 

Barry Becker commented on SPARK-20226:
--

We noticed that this is reproducible just by adding a new column that is based 
on a very simple UDF like
{code}
def newColumn = udf((values: Row) => {
  "foo"
}) 
{code}
The time taken to cache the query plan is highly dependent on the nunber of 
columns columns that are pass to this udf - even though they are not used by 
it. If 8 or fewer columns are in the Row based in, then everything is 
reasonably fast, but as soon as you get to 11 or so, things slow dramatically. 
By the time you get to 14 columns, it takes forever. I will try using a simpler 
pipeline to see if I can eliminate either the StringIndexers or Bucketizers as 
contributing factors.

> Call to sqlContext.cacheTable takes an incredibly long time in some cases
> -
>
> Key: SPARK-20226
> URL: https://issues.apache.org/jira/browse/SPARK-20226
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: linux or windows
>Reporter: Barry Becker
> Attachments: xyzzy.csv
>
>
> I have a case where the call to sqlContext.cacheTable can take an arbitrarily 
> long time depending on the number of columns that are referenced in a 
> withColumn expression applied to a dataframe.
> The dataset is small (20 columns 7861 rows). The sequence to reproduce is the 
> following:
> 1) add a new column that references 8 - 14 of the columns in the dataset. 
>- If I add 8 columns, then the call to cacheTable is fast - like *5 
> seconds*
>- If I add 11 columns, then it is slow - like *60 seconds*
>- and if I add 14 columns, then it basically *takes forever* - I gave up 
> after 10 minutes or so.
>   The Column expression that is added, is basically just concatenating 
> the columns together in a single string. If a number is concatenated on a 
> string (or vice versa) the number is first converted to a string.
>   The expression looks something like this:
> {code}
> `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + 
> `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + 
> `Penalty Amount` + `Interest Amount`
> {code}
> which we then convert to a Column expression that looks like this:
> {code}
> UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), 
> UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), 
> UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), 
> UDF('Interest Amount))
> {code}
>where the UDFs are very simple functions that basically call toString 
> and + as needed.
> 2) apply a pipeline that includes some transformers that was saved earlier. 
> Here are the steps of the pipeline (extracted from parquet)
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License
>  Type_CLEANED__","handleInvalid":"skip","outputCol":"License 
> Type_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing
>  Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation
>  Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code}
>  - 
> {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0",
> "uid":"bucketizer_6f65ca9fa813",
>   "paramMap":{
> "outputCol":"Summons 
>