[jira] [Commented] (SPARK-15703) Make ListenerBus event queue size configurable

2018-04-16 Thread rohit verma (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16440377#comment-16440377
 ] 

rohit verma commented on SPARK-15703:
-

Hi Guys, 

I am worried weather this issue has been resolved or not in 2.0.x/2.1.x 
releases, because I am using spark 2.2.0 and facing the same issue. I did not 
find anything documented regarding this.

 

Thanks.

> Make ListenerBus event queue size configurable
> --
>
> Key: SPARK-15703
> URL: https://issues.apache.org/jira/browse/SPARK-15703
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Web UI
>Affects Versions: 2.0.0
>Reporter: Thomas Graves
>Assignee: Dhruve Ashar
>Priority: Minor
> Fix For: 2.0.1, 2.1.0
>
> Attachments: Screen Shot 2016-06-01 at 11.21.32 AM.png, Screen Shot 
> 2016-06-01 at 11.23.48 AM.png, SparkListenerBus .png, 
> spark-dynamic-executor-allocation.png
>
>
> The Spark UI doesn't seem to be showing all the tasks and metrics.
> I ran a job with 10 tasks but Detail stage page says it completed 93029:
> Summary Metrics for 93029 Completed Tasks
> The Stages for all jobs pages list that only 89519/10 tasks finished but 
> its completed.  The metrics for shuffled write and input are also incorrect.
> I will attach screen shots.
> I checked the logs and it does show that all the tasks actually finished.
> 16/06/01 16:15:42 INFO TaskSetManager: Finished task 59880.0 in stage 2.0 
> (TID 54038) in 265309 ms on 10.213.45.51 (10/10)
> 16/06/01 16:15:42 INFO YarnClusterScheduler: Removed TaskSet 2.0, whose tasks 
> have all completed, from pool



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23206) Additional Memory Tuning Metrics

2018-04-16 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16440334#comment-16440334
 ] 

Imran Rashid commented on SPARK-23206:
--

thanks, shared doc works for me now!

> Additional Memory Tuning Metrics
> 
>
> Key: SPARK-23206
> URL: https://issues.apache.org/jira/browse/SPARK-23206
> Project: Spark
>  Issue Type: Umbrella
>  Components: Spark Core
>Affects Versions: 2.2.1
>Reporter: Edwina Lu
>Priority: Major
> Attachments: ExecutorsTab.png, ExecutorsTab2.png, 
> MemoryTuningMetricsDesignDoc.pdf, SPARK-23206 Design Doc.pdf, StageTab.png
>
>
> At LinkedIn, we have multiple clusters, running thousands of Spark 
> applications, and these numbers are growing rapidly. We need to ensure that 
> these Spark applications are well tuned – cluster resources, including 
> memory, should be used efficiently so that the cluster can support running 
> more applications concurrently, and applications should run quickly and 
> reliably.
> Currently there is limited visibility into how much memory executors are 
> using, and users are guessing numbers for executor and driver memory sizing. 
> These estimates are often much larger than needed, leading to memory wastage. 
> Examining the metrics for one cluster for a month, the average percentage of 
> used executor memory (max JVM used memory across executors /  
> spark.executor.memory) is 35%, leading to an average of 591GB unused memory 
> per application (number of executors * (spark.executor.memory - max JVM used 
> memory)). Spark has multiple memory regions (user memory, execution memory, 
> storage memory, and overhead memory), and to understand how memory is being 
> used and fine-tune allocation between regions, it would be useful to have 
> information about how much memory is being used for the different regions.
> To improve visibility into memory usage for the driver and executors and 
> different memory regions, the following additional memory metrics can be be 
> tracked for each executor and driver:
>  * JVM used memory: the JVM heap size for the executor/driver.
>  * Execution memory: memory used for computation in shuffles, joins, sorts 
> and aggregations.
>  * Storage memory: memory used caching and propagating internal data across 
> the cluster.
>  * Unified memory: sum of execution and storage memory.
> The peak values for each memory metric can be tracked for each executor, and 
> also per stage. This information can be shown in the Spark UI and the REST 
> APIs. Information for peak JVM used memory can help with determining 
> appropriate values for spark.executor.memory and spark.driver.memory, and 
> information about the unified memory region can help with determining 
> appropriate values for spark.memory.fraction and 
> spark.memory.storageFraction. Stage memory information can help identify 
> which stages are most memory intensive, and users can look into the relevant 
> code to determine if it can be optimized.
> The memory metrics can be gathered by adding the current JVM used memory, 
> execution memory and storage memory to the heartbeat. SparkListeners are 
> modified to collect the new metrics for the executors, stages and Spark 
> history log. Only interesting values (peak values per stage per executor) are 
> recorded in the Spark history log, to minimize the amount of additional 
> logging.
> We have attached our design documentation with this ticket and would like to 
> receive feedback from the community for this proposal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23206) Additional Memory Tuning Metrics

2018-04-16 Thread Edwina Lu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16440326#comment-16440326
 ] 

Edwina Lu commented on SPARK-23206:
---

[~smilegator], please try: 
https://docs.google.com/document/d/1fIL2XMHPnqs6kaeHr822iTvs08uuYnjP5roSGZfejyA/edit?usp=sharing

> Additional Memory Tuning Metrics
> 
>
> Key: SPARK-23206
> URL: https://issues.apache.org/jira/browse/SPARK-23206
> Project: Spark
>  Issue Type: Umbrella
>  Components: Spark Core
>Affects Versions: 2.2.1
>Reporter: Edwina Lu
>Priority: Major
> Attachments: ExecutorsTab.png, ExecutorsTab2.png, 
> MemoryTuningMetricsDesignDoc.pdf, SPARK-23206 Design Doc.pdf, StageTab.png
>
>
> At LinkedIn, we have multiple clusters, running thousands of Spark 
> applications, and these numbers are growing rapidly. We need to ensure that 
> these Spark applications are well tuned – cluster resources, including 
> memory, should be used efficiently so that the cluster can support running 
> more applications concurrently, and applications should run quickly and 
> reliably.
> Currently there is limited visibility into how much memory executors are 
> using, and users are guessing numbers for executor and driver memory sizing. 
> These estimates are often much larger than needed, leading to memory wastage. 
> Examining the metrics for one cluster for a month, the average percentage of 
> used executor memory (max JVM used memory across executors /  
> spark.executor.memory) is 35%, leading to an average of 591GB unused memory 
> per application (number of executors * (spark.executor.memory - max JVM used 
> memory)). Spark has multiple memory regions (user memory, execution memory, 
> storage memory, and overhead memory), and to understand how memory is being 
> used and fine-tune allocation between regions, it would be useful to have 
> information about how much memory is being used for the different regions.
> To improve visibility into memory usage for the driver and executors and 
> different memory regions, the following additional memory metrics can be be 
> tracked for each executor and driver:
>  * JVM used memory: the JVM heap size for the executor/driver.
>  * Execution memory: memory used for computation in shuffles, joins, sorts 
> and aggregations.
>  * Storage memory: memory used caching and propagating internal data across 
> the cluster.
>  * Unified memory: sum of execution and storage memory.
> The peak values for each memory metric can be tracked for each executor, and 
> also per stage. This information can be shown in the Spark UI and the REST 
> APIs. Information for peak JVM used memory can help with determining 
> appropriate values for spark.executor.memory and spark.driver.memory, and 
> information about the unified memory region can help with determining 
> appropriate values for spark.memory.fraction and 
> spark.memory.storageFraction. Stage memory information can help identify 
> which stages are most memory intensive, and users can look into the relevant 
> code to determine if it can be optimized.
> The memory metrics can be gathered by adding the current JVM used memory, 
> execution memory and storage memory to the heartbeat. SparkListeners are 
> modified to collect the new metrics for the executors, stages and Spark 
> history log. Only interesting values (peak values per stage per executor) are 
> recorded in the Spark history log, to minimize the amount of additional 
> logging.
> We have attached our design documentation with this ticket and would like to 
> receive feedback from the community for this proposal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23206) Additional Memory Tuning Metrics

2018-04-16 Thread Edwina Lu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438879#comment-16438879
 ] 

Edwina Lu edited comment on SPARK-23206 at 4/17/18 3:15 AM:


After discussion with [~irashid] on the PR, we've decided to move 
ExecutorMetricsUpdate logging to stage end, to minimize the amount of extra 
logging. The updated design doc: 
https://docs.google.com/document/d/1fIL2XMHPnqs6kaeHr822iTvs08uuYnjP5roSGZfejyA/edit?usp=sharing

[^SPARK-23206 Design Doc.pdf]


was (Author: elu):
After discussion with [~irashid] on the PR, we've decided to move 
ExecutorMetricsUpdate logging to stage end, to minimize the amount of extra 
logging. The updated design doc: 
[https://docs.google.com/document/d/1vLojop9I4WkpUdbrSnoHzJ6jkCMnH2Ot5JTSk7YEX5s/edit?usp=sharing|https://docs.google.com/document/d/1fIL2XMHPnqs6kaeHr822iTvs08uuYnjP5roSGZfejyA/edit?usp=sharing]

[^SPARK-23206 Design Doc.pdf]

> Additional Memory Tuning Metrics
> 
>
> Key: SPARK-23206
> URL: https://issues.apache.org/jira/browse/SPARK-23206
> Project: Spark
>  Issue Type: Umbrella
>  Components: Spark Core
>Affects Versions: 2.2.1
>Reporter: Edwina Lu
>Priority: Major
> Attachments: ExecutorsTab.png, ExecutorsTab2.png, 
> MemoryTuningMetricsDesignDoc.pdf, SPARK-23206 Design Doc.pdf, StageTab.png
>
>
> At LinkedIn, we have multiple clusters, running thousands of Spark 
> applications, and these numbers are growing rapidly. We need to ensure that 
> these Spark applications are well tuned – cluster resources, including 
> memory, should be used efficiently so that the cluster can support running 
> more applications concurrently, and applications should run quickly and 
> reliably.
> Currently there is limited visibility into how much memory executors are 
> using, and users are guessing numbers for executor and driver memory sizing. 
> These estimates are often much larger than needed, leading to memory wastage. 
> Examining the metrics for one cluster for a month, the average percentage of 
> used executor memory (max JVM used memory across executors /  
> spark.executor.memory) is 35%, leading to an average of 591GB unused memory 
> per application (number of executors * (spark.executor.memory - max JVM used 
> memory)). Spark has multiple memory regions (user memory, execution memory, 
> storage memory, and overhead memory), and to understand how memory is being 
> used and fine-tune allocation between regions, it would be useful to have 
> information about how much memory is being used for the different regions.
> To improve visibility into memory usage for the driver and executors and 
> different memory regions, the following additional memory metrics can be be 
> tracked for each executor and driver:
>  * JVM used memory: the JVM heap size for the executor/driver.
>  * Execution memory: memory used for computation in shuffles, joins, sorts 
> and aggregations.
>  * Storage memory: memory used caching and propagating internal data across 
> the cluster.
>  * Unified memory: sum of execution and storage memory.
> The peak values for each memory metric can be tracked for each executor, and 
> also per stage. This information can be shown in the Spark UI and the REST 
> APIs. Information for peak JVM used memory can help with determining 
> appropriate values for spark.executor.memory and spark.driver.memory, and 
> information about the unified memory region can help with determining 
> appropriate values for spark.memory.fraction and 
> spark.memory.storageFraction. Stage memory information can help identify 
> which stages are most memory intensive, and users can look into the relevant 
> code to determine if it can be optimized.
> The memory metrics can be gathered by adding the current JVM used memory, 
> execution memory and storage memory to the heartbeat. SparkListeners are 
> modified to collect the new metrics for the executors, stages and Spark 
> history log. Only interesting values (peak values per stage per executor) are 
> recorded in the Spark history log, to minimize the amount of additional 
> logging.
> We have attached our design documentation with this ticket and would like to 
> receive feedback from the community for this proposal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23206) Additional Memory Tuning Metrics

2018-04-16 Thread Edwina Lu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438879#comment-16438879
 ] 

Edwina Lu edited comment on SPARK-23206 at 4/17/18 3:14 AM:


After discussion with [~irashid] on the PR, we've decided to move 
ExecutorMetricsUpdate logging to stage end, to minimize the amount of extra 
logging. The updated design doc: 
[https://docs.google.com/document/d/1vLojop9I4WkpUdbrSnoHzJ6jkCMnH2Ot5JTSk7YEX5s/edit?usp=sharing|https://docs.google.com/document/d/1fIL2XMHPnqs6kaeHr822iTvs08uuYnjP5roSGZfejyA/edit?usp=sharing]

[^SPARK-23206 Design Doc.pdf]


was (Author: elu):
After discussion with [~irashid] on the PR, we've decided to move 
ExecutorMetricsUpdate logging to stage end, to minimize the amount of extra 
logging. The updated design doc: 
[https://docs.google.com/document/d/1vLojop9I4WkpUdbrSnoHzJ6jkCMnH2Ot5JTSk7YEX5s/edit?usp=sharing]

[^SPARK-23206 Design Doc.pdf]

> Additional Memory Tuning Metrics
> 
>
> Key: SPARK-23206
> URL: https://issues.apache.org/jira/browse/SPARK-23206
> Project: Spark
>  Issue Type: Umbrella
>  Components: Spark Core
>Affects Versions: 2.2.1
>Reporter: Edwina Lu
>Priority: Major
> Attachments: ExecutorsTab.png, ExecutorsTab2.png, 
> MemoryTuningMetricsDesignDoc.pdf, SPARK-23206 Design Doc.pdf, StageTab.png
>
>
> At LinkedIn, we have multiple clusters, running thousands of Spark 
> applications, and these numbers are growing rapidly. We need to ensure that 
> these Spark applications are well tuned – cluster resources, including 
> memory, should be used efficiently so that the cluster can support running 
> more applications concurrently, and applications should run quickly and 
> reliably.
> Currently there is limited visibility into how much memory executors are 
> using, and users are guessing numbers for executor and driver memory sizing. 
> These estimates are often much larger than needed, leading to memory wastage. 
> Examining the metrics for one cluster for a month, the average percentage of 
> used executor memory (max JVM used memory across executors /  
> spark.executor.memory) is 35%, leading to an average of 591GB unused memory 
> per application (number of executors * (spark.executor.memory - max JVM used 
> memory)). Spark has multiple memory regions (user memory, execution memory, 
> storage memory, and overhead memory), and to understand how memory is being 
> used and fine-tune allocation between regions, it would be useful to have 
> information about how much memory is being used for the different regions.
> To improve visibility into memory usage for the driver and executors and 
> different memory regions, the following additional memory metrics can be be 
> tracked for each executor and driver:
>  * JVM used memory: the JVM heap size for the executor/driver.
>  * Execution memory: memory used for computation in shuffles, joins, sorts 
> and aggregations.
>  * Storage memory: memory used caching and propagating internal data across 
> the cluster.
>  * Unified memory: sum of execution and storage memory.
> The peak values for each memory metric can be tracked for each executor, and 
> also per stage. This information can be shown in the Spark UI and the REST 
> APIs. Information for peak JVM used memory can help with determining 
> appropriate values for spark.executor.memory and spark.driver.memory, and 
> information about the unified memory region can help with determining 
> appropriate values for spark.memory.fraction and 
> spark.memory.storageFraction. Stage memory information can help identify 
> which stages are most memory intensive, and users can look into the relevant 
> code to determine if it can be optimized.
> The memory metrics can be gathered by adding the current JVM used memory, 
> execution memory and storage memory to the heartbeat. SparkListeners are 
> modified to collect the new metrics for the executors, stages and Spark 
> history log. Only interesting values (peak values per stage per executor) are 
> recorded in the Spark history log, to minimize the amount of additional 
> logging.
> We have attached our design documentation with this ticket and would like to 
> receive feedback from the community for this proposal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten

2018-04-16 Thread liuxian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16440250#comment-16440250
 ] 

liuxian commented on SPARK-23989:
-

If we make 'BypassMergeSortShuffleHandle' and 'SerializedShuffleHandle' 
disable, a lot of unit tests in 'DataFrameAggregateSuite.scala' will fail

> When using `SortShuffleWriter`, the data will be overwritten
> 
>
> Key: SPARK-23989
> URL: https://issues.apache.org/jira/browse/SPARK-23989
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: liuxian
>Priority: Critical
>
> {color:#33}When using `SortShuffleWriter`, we only insert  
> '{color}{color:#cc7832}AnyRef{color}{color:#33}' into 
> '{color}PartitionedAppendOnlyMap{color:#33}' or 
> '{color}PartitionedPairBuffer{color:#33}'.{color}
> {color:#33}For this function:{color}
> {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: 
> {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832},
>  {color}{color:#4e807d}V{color}]])
> the value of 'records' is `UnsafeRow`, so  the value will be overwritten
> {color:#33} {color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22239) User-defined window functions with pandas udf

2018-04-16 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22239:


Assignee: (was: Apache Spark)

> User-defined window functions with pandas udf
> -
>
> Key: SPARK-22239
> URL: https://issues.apache.org/jira/browse/SPARK-22239
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: 
>Reporter: Li Jin
>Priority: Major
>
> Window function is another place we can benefit from vectored udf and add 
> another useful function to the pandas_udf suite.
> Example usage (preliminary):
> {code:java}
> w = Window.partitionBy('id').orderBy('time').rangeBetween(-200, 0)
> @pandas_udf(DoubleType())
> def ema(v1):
> return v1.ewm(alpha=0.5).mean().iloc[-1]
> df.withColumn('v1_ema', ema(df.v1).over(window))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22239) User-defined window functions with pandas udf

2018-04-16 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16440094#comment-16440094
 ] 

Apache Spark commented on SPARK-22239:
--

User 'icexelloss' has created a pull request for this issue:
https://github.com/apache/spark/pull/21082

> User-defined window functions with pandas udf
> -
>
> Key: SPARK-22239
> URL: https://issues.apache.org/jira/browse/SPARK-22239
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: 
>Reporter: Li Jin
>Priority: Major
>
> Window function is another place we can benefit from vectored udf and add 
> another useful function to the pandas_udf suite.
> Example usage (preliminary):
> {code:java}
> w = Window.partitionBy('id').orderBy('time').rangeBetween(-200, 0)
> @pandas_udf(DoubleType())
> def ema(v1):
> return v1.ewm(alpha=0.5).mean().iloc[-1]
> df.withColumn('v1_ema', ema(df.v1).over(window))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22239) User-defined window functions with pandas udf

2018-04-16 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22239:


Assignee: Apache Spark

> User-defined window functions with pandas udf
> -
>
> Key: SPARK-22239
> URL: https://issues.apache.org/jira/browse/SPARK-22239
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: 
>Reporter: Li Jin
>Assignee: Apache Spark
>Priority: Major
>
> Window function is another place we can benefit from vectored udf and add 
> another useful function to the pandas_udf suite.
> Example usage (preliminary):
> {code:java}
> w = Window.partitionBy('id').orderBy('time').rangeBetween(-200, 0)
> @pandas_udf(DoubleType())
> def ema(v1):
> return v1.ewm(alpha=0.5).mean().iloc[-1]
> df.withColumn('v1_ema', ema(df.v1).over(window))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19618) Inconsistency wrt max. buckets allowed from Dataframe API vs SQL

2018-04-16 Thread Fernando Pereira (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16440085#comment-16440085
 ] 

Fernando Pereira commented on SPARK-19618:
--

Opened [SPARK-23997|https://issues.apache.org/jira/browse/SPARK-23997]

Thanks

> Inconsistency wrt max. buckets allowed from Dataframe API vs SQL
> 
>
> Key: SPARK-19618
> URL: https://issues.apache.org/jira/browse/SPARK-19618
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Tejas Patil
>Assignee: Tejas Patil
>Priority: Major
> Fix For: 2.2.0
>
>
> High number of buckets is allowed while creating a table via SQL query:
> {code}
> sparkSession.sql("""
> CREATE TABLE bucketed_table(col1 INT) USING parquet 
> CLUSTERED BY (col1) SORTED BY (col1) INTO 147483647 BUCKETS
> """)
> sparkSession.sql("DESC FORMATTED bucketed_table").collect.foreach(println)
> 
> [Num Buckets:,147483647,]
> [Bucket Columns:,[col1],]
> [Sort Columns:,[col1],]
> 
> {code}
> Trying the same via dataframe API does not work:
> {code}
> > df.write.format("orc").bucketBy(147483647, 
> > "j","k").sortBy("j","k").saveAsTable("bucketed_table")
> java.lang.IllegalArgumentException: requirement failed: Bucket number must be 
> greater than 0 and less than 10.
>   at scala.Predef$.require(Predef.scala:224)
>   at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$getBucketSpec$2.apply(DataFrameWriter.scala:293)
>   at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$getBucketSpec$2.apply(DataFrameWriter.scala:291)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.spark.sql.DataFrameWriter.getBucketSpec(DataFrameWriter.scala:291)
>   at 
> org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:429)
>   at 
> org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:410)
>   at 
> org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:365)
>   ... 50 elided
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23997) Configurable max number of buckets

2018-04-16 Thread Fernando Pereira (JIRA)
Fernando Pereira created SPARK-23997:


 Summary: Configurable max number of buckets
 Key: SPARK-23997
 URL: https://issues.apache.org/jira/browse/SPARK-23997
 Project: Spark
  Issue Type: Bug
  Components: Input/Output, SQL
Affects Versions: 2.3.0, 2.2.1
Reporter: Fernando Pereira


When exporting data as a table the user can choose to split data in buckets by 
choosing the columns and the number of buckets. Currently there is a hard-coded 
limit of 99'999 buckets.

However, for heavy workloads this limit might be too restrictive, a situation 
that will eventually become more common as workloads grow.

As per the comments in SPARK-19618 this limit could be made configurable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23638) Spark on k8s: spark.kubernetes.initContainer.image has no effect

2018-04-16 Thread Yinan Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16440067#comment-16440067
 ] 

Yinan Li commented on SPARK-23638:
--

Can this be closed?

> Spark on k8s: spark.kubernetes.initContainer.image has no effect
> 
>
> Key: SPARK-23638
> URL: https://issues.apache.org/jira/browse/SPARK-23638
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.3.0
> Environment: K8 server: Ubuntu 16.04
> Submission client: macOS Sierra 10.12.x
> Client Version: version.Info\{Major:"1", Minor:"9", GitVersion:"v1.9.3", 
> GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b", GitTreeState:"clean", 
> BuildDate:"2018-02-07T12:22:21Z", GoVersion:"go1.9.2", Compiler:"gc", 
> Platform:"darwin/amd64"}
> Server Version: version.Info\{Major:"1", Minor:"8", GitVersion:"v1.8.3", 
> GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd", GitTreeState:"clean", 
> BuildDate:"2017-11-08T18:27:48Z", GoVersion:"go1.8.3", Compiler:"gc", 
> Platform:"linux/amd64"}
>Reporter: maheshvra
>Priority: Major
>
> Hi all - I am trying to use initContainer to download remote dependencies. To 
> begin with, I ran a test with initContainer which basically "echo hello 
> world". However, when i triggered the pod deployment via spark-submit, I did 
> not see any trace of initContainer execution in my kubernetes cluster.
>  
> {code:java}
> SPARK_DRIVER_MEMORY: 1g 
> SPARK_DRIVER_CLASS: com.bigdata.App SPARK_DRIVER_ARGS: -c 
> /opt/spark/work-dir/app/main/environments/int -w 
> ./../../workflows/workflow_main.json -e prod -n features -v off 
> SPARK_DRIVER_BIND_ADDRESS:  
> SPARK_JAVA_OPT_0: -Dspark.submit.deployMode=cluster 
> SPARK_JAVA_OPT_1: -Dspark.driver.blockManager.port=7079 
> SPARK_JAVA_OPT_2: -Dspark.app.name=fg-am00-raw12 
> SPARK_JAVA_OPT_3: 
> -Dspark.kubernetes.container.image=docker.com/cmapp/fg-am00-raw:1.0.0 
> SPARK_JAVA_OPT_4: -Dspark.app.id=spark-4fa9a5ce1b1d401fa9c1e413ff030d44 
> SPARK_JAVA_OPT_5: 
> -Dspark.jars=/opt/spark/jars/aws-java-sdk-1.7.4.jar,/opt/spark/jars/hadoop-aws-2.7.3.jar,/opt/spark/jars/guava-14.0.1.jar,/opt/spark/jars/SparkApp.jar,/opt/spark/jars/datacleanup-component-1.0-SNAPSHOT.jar
>  
> SPARK_JAVA_OPT_6: -Dspark.driver.port=7078 
> SPARK_JAVA_OPT_7: 
> -Dspark.kubernetes.initContainer.image=docker.com/cmapp/custombusybox:1.0.0 
> SPARK_JAVA_OPT_8: 
> -Dspark.kubernetes.executor.podNamePrefix=fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615
>  
> SPARK_JAVA_OPT_9: 
> -Dspark.kubernetes.driver.pod.name=fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615-driver
>  
> SPARK_JAVA_OPT_10: 
> -Dspark.driver.host=fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615-driver-svc.experimental.svc
>  SPARK_JAVA_OPT_11: -Dspark.executor.instances=5 
> SPARK_JAVA_OPT_12: 
> -Dspark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256 
> SPARK_JAVA_OPT_13: -Dspark.kubernetes.namespace=experimental 
> SPARK_JAVA_OPT_14: 
> -Dspark.kubernetes.authenticate.driver.serviceAccountName=experimental-service-account
>  SPARK_JAVA_OPT_15: -Dspark.master=k8s://https://bigdata
> {code}
>  
> Further, I did not see spec.initContainers section in the generated pod. 
> Please see the details below
>  
> {code:java}
>  
> {
> "kind": "Pod",
> "apiVersion": "v1",
> "metadata": {
> "name": "fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615-driver",
> "namespace": "experimental",
> "selfLink": 
> "/api/v1/namespaces/experimental/pods/fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615-driver",
> "uid": "adc5a50a-2342-11e8-87dc-12c5b3954044",
> "resourceVersion": "299054",
> "creationTimestamp": "2018-03-09T02:36:32Z",
> "labels": {
> "spark-app-selector": "spark-4fa9a5ce1b1d401fa9c1e413ff030d44",
> "spark-role": "driver"
> },
> "annotations": {
> "spark-app-name": "fg-am00-raw12"
> }
> },
> "spec": {
> "volumes": [
> {
> "name": "experimental-service-account-token-msmth",
> "secret": {
> "secretName": "experimental-service-account-token-msmth",
> "defaultMode": 420
> }
> }
> ],
> "containers": [
> {
> "name": "spark-kubernetes-driver",
> "image": "docker.com/cmapp/fg-am00-raw:1.0.0",
> "args": [
> "driver"
> ],
> "env": [
> {
> "name": "SPARK_DRIVER_MEMORY",
> "value": "1g"
> },
> {
> "name": "SPARK_DRIVER_CLASS",
> "value": "com.myapp.App"
> },
> {
> "name": "SPARK_DRIVER_ARGS",
> "value": "-c /opt/spark/work-dir/app/main/environments/int -w 
> ./../../workflows/workflow_main.json -e prod -n features -v off"
> },
> {
> "name": "SPARK_DRIVER_BIND_ADDRESS",
> "valueFrom": {
> "fieldRef": {
> "apiVersion": "v1",
> "fieldPath": "status.podIP"
> }
> }
> },
> {
> "name": "SPARK_MOUNTED_CLASSPATH",
> "value": 
> 

[jira] [Resolved] (SPARK-23873) Use accessors in interpreted LambdaVariable

2018-04-16 Thread Herman van Hovell (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Herman van Hovell resolved SPARK-23873.
---
   Resolution: Fixed
 Assignee: Liang-Chi Hsieh
Fix Version/s: 2.4.0

> Use accessors in interpreted LambdaVariable
> ---
>
> Key: SPARK-23873
> URL: https://issues.apache.org/jira/browse/SPARK-23873
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Liang-Chi Hsieh
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.4.0
>
>
> Currently, interpreted execution of {{LambdaVariable}} just uses 
> {{InternalRow.get}} to access element. We should use specified accessors if 
> possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23986) CompileException when using too many avg aggregation after joining

2018-04-16 Thread Michel Davit (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439915#comment-16439915
 ] 

Michel Davit commented on SPARK-23986:
--

Thx [~mgaido]. I didn't have time to setup the environment to submit the pull 
request this weekend :)

> CompileException when using too many avg aggregation after joining
> --
>
> Key: SPARK-23986
> URL: https://issues.apache.org/jira/browse/SPARK-23986
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Michel Davit
>Priority: Major
> Attachments: spark-generated.java
>
>
> Considering the following code:
> {code:java}
> val df1: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
>   .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")
> val df2: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, "val1", "val2")))
>   .toDF("key", "dummy1", "dummy2")
> val agg = df1
>   .join(df2, df1("key") === df2("key"), "leftouter")
>   .groupBy(df1("key"))
>   .agg(
> avg("col2").as("avg2"),
> avg("col3").as("avg3"),
> avg("col4").as("avg4"),
> avg("col1").as("avg1"),
> avg("col5").as("avg5"),
> avg("col6").as("avg6")
>   )
> val head = agg.take(1)
> {code}
> This logs the following exception:
> {code:java}
> ERROR CodeGenerator: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 467, Column 28: Redefinition of parameter "agg_expr_11"
> {code}
> I am not a spark expert but after investigation, I realized that the 
> generated {{doConsume}} method is responsible of the exception.
> Indeed, {{avg}} calls several times 
> {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. 
> The 1st time with the 'avg' Expr and a second time for the base aggregation 
> Expr (count and sum).
> The problem comes from the generation of parameters in CodeGenerator:
> {code:java}
>   /**
>* Returns a term name that is unique within this instance of a 
> `CodegenContext`.
>*/
>   def freshName(name: String): String = synchronized {
> val fullName = if (freshNamePrefix == "") {
>   name
> } else {
>   s"${freshNamePrefix}_$name"
> }
> if (freshNameIds.contains(fullName)) {
>   val id = freshNameIds(fullName)
>   freshNameIds(fullName) = id + 1
>   s"$fullName$id"
> } else {
>   freshNameIds += fullName -> 1
>   fullName
> }
>   }
> {code}
> The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call.
>  The second call is made with {{agg_expr_[1..12]}} and generates the 
> following names:
>  {{agg_expr_[11|21|31|41|51|61|11|12]}}. We then have a parameter name 
> conflicts in the generated code: {{agg_expr_11.}}
> Appending the 'id' in s"$fullName$id" to generate unique term name is source 
> of conflict. Maybe simply using undersoce can solve this issue : 
> $fullName_$id"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23996) Implement the optimal KLL algorithms for quantiles in streams

2018-04-16 Thread Timothy Hunter (JIRA)
Timothy Hunter created SPARK-23996:
--

 Summary: Implement the optimal KLL algorithms for quantiles in 
streams
 Key: SPARK-23996
 URL: https://issues.apache.org/jira/browse/SPARK-23996
 Project: Spark
  Issue Type: Improvement
  Components: MLlib, SQL
Affects Versions: 2.3.0
Reporter: Timothy Hunter


The current implementation for approximate quantiles - a variant of 
Grunwald-Khanna, which I implemented - is not the best in light of recent 
papers:

 - it is not exactly the one from the paper for performance reasons, but the 
changes are not documented beyond comments on the code

 - there are now more optimal algorithms with proven bounds (unlike q-digest, 
the other contender at the time)

I propose that we revisit the current implementation and look at the 
Karnin-Lang-Liberty algorithm (KLL) for example:
[https://arxiv.org/abs/1603.05346]

[https://edoliberty.github.io//papers/streamingQuantiles.pdf]

This algorithm seems to have favorable characteristics for streaming and a 
distributed implementation, and there is a python implementation for reference.

It is a fairly standalone piece, and in that respect available to people who 
don't know too much about spark internals.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23975) Allow Clustering to take Arrays of Double as input features

2018-04-16 Thread Joseph K. Bradley (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley updated SPARK-23975:
--
Shepherd: Joseph K. Bradley

> Allow Clustering to take Arrays of Double as input features
> ---
>
> Key: SPARK-23975
> URL: https://issues.apache.org/jira/browse/SPARK-23975
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Lu Wang
>Priority: Major
>
> Clustering algorithms should accept Arrays in addition to Vectors as input 
> features. The python interface should also be changed so that it would make 
> PySpark a lot easier to use. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23904) Big execution plan cause OOM

2018-04-16 Thread Ruben Berenguel (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439815#comment-16439815
 ] 

Ruben Berenguel commented on SPARK-23904:
-

I'll give it a look, maybe there is a way to avoid it being generated when it 
is definitely not needed.

> Big execution plan cause OOM
> 
>
> Key: SPARK-23904
> URL: https://issues.apache.org/jira/browse/SPARK-23904
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Izek Greenfield
>Priority: Major
>  Labels: SQL, query
>
> I create a question in 
> [StackOverflow|https://stackoverflow.com/questions/49508683/spark-physicalplandescription-string-is-to-big]
>  
> Spark create the text representation of query in any case even if I don't 
> need it.
> That causes many garbage object and unneeded GC... 
>  [Gist with code to 
> reproduce|https://gist.github.com/igreenfield/584c3336f03ba7d63e9026774eaf5e23]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23975) Allow Clustering to take Arrays of Double as input features

2018-04-16 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23975:


Assignee: Apache Spark

> Allow Clustering to take Arrays of Double as input features
> ---
>
> Key: SPARK-23975
> URL: https://issues.apache.org/jira/browse/SPARK-23975
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Lu Wang
>Assignee: Apache Spark
>Priority: Major
>
> Clustering algorithms should accept Arrays in addition to Vectors as input 
> features. The python interface should also be changed so that it would make 
> PySpark a lot easier to use. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23975) Allow Clustering to take Arrays of Double as input features

2018-04-16 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23975:


Assignee: (was: Apache Spark)

> Allow Clustering to take Arrays of Double as input features
> ---
>
> Key: SPARK-23975
> URL: https://issues.apache.org/jira/browse/SPARK-23975
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Lu Wang
>Priority: Major
>
> Clustering algorithms should accept Arrays in addition to Vectors as input 
> features. The python interface should also be changed so that it would make 
> PySpark a lot easier to use. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23975) Allow Clustering to take Arrays of Double as input features

2018-04-16 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439793#comment-16439793
 ] 

Apache Spark commented on SPARK-23975:
--

User 'ludatabricks' has created a pull request for this issue:
https://github.com/apache/spark/pull/21081

> Allow Clustering to take Arrays of Double as input features
> ---
>
> Key: SPARK-23975
> URL: https://issues.apache.org/jira/browse/SPARK-23975
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Lu Wang
>Priority: Major
>
> Clustering algorithms should accept Arrays in addition to Vectors as input 
> features. The python interface should also be changed so that it would make 
> PySpark a lot easier to use. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23995) initial job has not accept any resources and executor keep exit

2018-04-16 Thread Cong Shen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cong Shen updated SPARK-23995:
--
Environment: 
Spark version:2.3.0

JDK version: 1.8.0_131

System: CentOS v7

{\{export JAVA_HOME=/usr/java/jdk1.8.0_144 }}

{{export SPARK_MASTER_IP=IP }}

{{export PYSPARK_PYTHON=/opt/anaconda3/bin/python }}

{{export SPARK_WORKER_MEMORY=2g }}

{{export SPARK_WORK_INSTANCES=1 }}

{{export SPARK_WORkER_CORES=4 export SPARK_EXECUTOR_MEMORY=1g}}

 

The firewalls are stopped.

  was:
Spark version:2.3.0

JDK version: 1.8.0_131

System: CentOS v7

{{export JAVA_HOME=/usr/java/jdk1.8.0_144 }}

{{export SPARK_MASTER_IP=IP export PYSPARK_PYTHON=/opt/anaconda3/bin/python 
export SPARK_WORKER_MEMORY=2g export SPARK_WORK_INSTANCES=1 export 
SPARK_WORkER_CORES=4 export SPARK_EXECUTOR_MEMORY=1g}}

 

The firewalls are stopped.


> initial job has not accept any resources and executor keep exit
> ---
>
> Key: SPARK-23995
> URL: https://issues.apache.org/jira/browse/SPARK-23995
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy
>Affects Versions: 2.3.0
> Environment: Spark version:2.3.0
> JDK version: 1.8.0_131
> System: CentOS v7
> {\{export JAVA_HOME=/usr/java/jdk1.8.0_144 }}
> {{export SPARK_MASTER_IP=IP }}
> {{export PYSPARK_PYTHON=/opt/anaconda3/bin/python }}
> {{export SPARK_WORKER_MEMORY=2g }}
> {{export SPARK_WORK_INSTANCES=1 }}
> {{export SPARK_WORkER_CORES=4 export SPARK_EXECUTOR_MEMORY=1g}}
>  
> The firewalls are stopped.
>Reporter: Cong Shen
>Priority: Major
>  Labels: executor, standalone,
>
> I have a spark cluster using cloud resource in two instances. One as master 
> and one as worker. The total resource is 4 cores and 10G ram. I can start 
> shell, and worker can register successfully.But when I run simple code.
> The error from shell is:
> TaskSchedulerImpl:66 - Initial job has not accept any resources.
> master log:
>  
> {code:java}
> // code placeholder
> 2018-04-12 13:09:14 INFO Master:54 - Registering app Spark shell 2018-04-12 
> 13:09:14 INFO Master:54 - Registered app Spark shell with ID 
> app-20180412130914- 2018-04-12 13:09:14 INFO Master:54 - Launching 
> executor app-20180412130914-/0 on worker 
> worker-20180411144020-192.**.**.**-44986 2018-04-12 13:11:15 INFO Master:54 - 
> Removing executor app-20180412130914-/0 because it is EXITED 2018-04-12 
> 13:11:15 INFO Master:54 - Launching executor app-20180412130914-/1 on 
> worker worker-20180411144020-192.**.**.**-44986 2018-04-12 13:13:16 INFO 
> Master:54 - Removing executor app-20180412130914-/1 because it is EXITED 
> 2018-04-12 13:13:16 INFO Master:54 - Launching executor 
> app-20180412130914-/2 on worker worker-20180411144020-192.**.**.**-44986 
> 2018-04-12 13:15:17 INFO Master:54 - Removing executor 
> app-20180412130914-/2 because it is EXITED 2018-04-12 13:15:17 INFO 
> Master:54 - Launching executor app-20180412130914-/3 on worker 
> worker-20180411144020-192.**.**.**-44986 2018-04-12 13:16:15 INFO Master:54 - 
> Removing app app-20180412130914- 2018-04-12 13:16:15 INFO Master:54 - 
> 192.**.**.**:39766 got disassociated, removing it. 2018-04-12 13:16:15 INFO 
> Master:54 - IP:39928 got disassociated, removing it. 2018-04-12 13:16:15 WARN 
> Master:66 - Got status update for unknown executor app-20180412130914-/3
> {code}
> Worker log:
>  
> {code:java}
> // code placeholder
> 2018-04-12 13:09:12 INFO  Worker:54 - Asked to launch executor
> app-20180412130914-/0 for Spark shell
> 2018-04-12 13:09:12 INFO  SecurityManager:54 - Changing view acls to: root
> 2018-04-12 13:09:12 INFO  SecurityManager:54 - Changing modify acls to: root
> 2018-04-12 13:09:12 INFO  SecurityManager:54 - Changing view acls groups to: 
> 2018-04-12 13:09:12 INFO  SecurityManager:54 - Changing modify acls groups  
> to: 
> 2018-04-12 13:09:12 INFO  SecurityManager:54 - SecurityManager: 
> authentication disabled; ui acls disabled; users  with view permissions:
> Set(root); groups with view permissions: Set(); users  with modify 
> permissions: Set(root); groups with modify permissions: Set()
> 2018-04-12 13:09:12 INFO  ExecutorRunner:54 - Launch command: 
> "/usr/java/jdk1.8.0_144/bin/java" "-cp" 
> "/opt/spark-2.3.0-bin-hadoop2.7/conf/:/opt/spark-2.3.0-bin-hadoop2.7/jars/*" 
> "-Xmx1024M" "-Dspark.driver.port=39928" 
> "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" 
> "spark://CoarseGrainedScheduler@IP:39928" "--executor-id" "0" "--hostname" 
> "192.**.**.**" "--cores" "4" "--app-id" "app-20180412130914-" 
> "--worker-url" "spark://Worker@192.**.**.**:44986"
> 2018-04-12 13:11:13 INFO  Worker:54 - Executor app-20180412130914-/0 
> finished with state EXITED message Command exited with code 1 exitStatus 1

[jira] [Commented] (SPARK-23030) Decrease memory consumption with toPandas() collection using Arrow

2018-04-16 Thread Bryan Cutler (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439727#comment-16439727
 ] 

Bryan Cutler commented on SPARK-23030:
--

Hi [~icexelloss], I have something working, just need to write it up then we 
can discuss on the PR.  You're right though, if we want to keep collection as 
fast as possible, it must be fully asynchronous.  Then unfortunately there is 
no way to avoid the worst case of having all data in the JVM driver memory.  I 
did improve the average case and got a little speedup, so hopefully it will be 
worth it.  I'll put up a PR soon.

> Decrease memory consumption with toPandas() collection using Arrow
> --
>
> Key: SPARK-23030
> URL: https://issues.apache.org/jira/browse/SPARK-23030
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark, SQL
>Affects Versions: 2.3.0
>Reporter: Bryan Cutler
>Priority: Major
>
> Currently with Arrow enabled, calling {{toPandas()}} results in a collection 
> of all partitions in the JVM in the form of batches of Arrow file format.  
> Once collected in the JVM, they are served to the Python driver process. 
> I believe using the Arrow stream format can help to optimize this and reduce 
> memory consumption in the JVM by only loading one record batch at a time 
> before sending it to Python.  This might also reduce the latency between 
> making the initial call in Python and receiving the first batch of records.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23995) initial job has not accept any resources and executor keep exit

2018-04-16 Thread Cong Shen (JIRA)
Cong Shen created SPARK-23995:
-

 Summary: initial job has not accept any resources and executor 
keep exit
 Key: SPARK-23995
 URL: https://issues.apache.org/jira/browse/SPARK-23995
 Project: Spark
  Issue Type: Bug
  Components: Deploy
Affects Versions: 2.3.0
 Environment: Spark version:2.3.0

JDK version: 1.8.0_131

System: CentOS v7

{{export JAVA_HOME=/usr/java/jdk1.8.0_144 }}

{{export SPARK_MASTER_IP=IP export PYSPARK_PYTHON=/opt/anaconda3/bin/python 
export SPARK_WORKER_MEMORY=2g export SPARK_WORK_INSTANCES=1 export 
SPARK_WORkER_CORES=4 export SPARK_EXECUTOR_MEMORY=1g}}

 

The firewalls are stopped.
Reporter: Cong Shen


I have a spark cluster using cloud resource in two instances. One as master and 
one as worker. The total resource is 4 cores and 10G ram. I can start shell, 
and worker can register successfully.But when I run simple code.

The error from shell is:

TaskSchedulerImpl:66 - Initial job has not accept any resources.

master log:

 
{code:java}
// code placeholder
2018-04-12 13:09:14 INFO Master:54 - Registering app Spark shell 2018-04-12 
13:09:14 INFO Master:54 - Registered app Spark shell with ID 
app-20180412130914- 2018-04-12 13:09:14 INFO Master:54 - Launching executor 
app-20180412130914-/0 on worker worker-20180411144020-192.**.**.**-44986 
2018-04-12 13:11:15 INFO Master:54 - Removing executor 
app-20180412130914-/0 because it is EXITED 2018-04-12 13:11:15 INFO 
Master:54 - Launching executor app-20180412130914-/1 on worker 
worker-20180411144020-192.**.**.**-44986 2018-04-12 13:13:16 INFO Master:54 - 
Removing executor app-20180412130914-/1 because it is EXITED 2018-04-12 
13:13:16 INFO Master:54 - Launching executor app-20180412130914-/2 on 
worker worker-20180411144020-192.**.**.**-44986 2018-04-12 13:15:17 INFO 
Master:54 - Removing executor app-20180412130914-/2 because it is EXITED 
2018-04-12 13:15:17 INFO Master:54 - Launching executor 
app-20180412130914-/3 on worker worker-20180411144020-192.**.**.**-44986 
2018-04-12 13:16:15 INFO Master:54 - Removing app app-20180412130914- 
2018-04-12 13:16:15 INFO Master:54 - 192.**.**.**:39766 got disassociated, 
removing it. 2018-04-12 13:16:15 INFO Master:54 - IP:39928 got disassociated, 
removing it. 2018-04-12 13:16:15 WARN Master:66 - Got status update for unknown 
executor app-20180412130914-/3

{code}
Worker log:

 
{code:java}
// code placeholder

2018-04-12 13:09:12 INFO  Worker:54 - Asked to launch executor
app-20180412130914-/0 for Spark shell
2018-04-12 13:09:12 INFO  SecurityManager:54 - Changing view acls to: root
2018-04-12 13:09:12 INFO  SecurityManager:54 - Changing modify acls to: root
2018-04-12 13:09:12 INFO  SecurityManager:54 - Changing view acls groups to: 
2018-04-12 13:09:12 INFO  SecurityManager:54 - Changing modify acls groups  to: 
2018-04-12 13:09:12 INFO  SecurityManager:54 - SecurityManager: authentication 
disabled; ui acls disabled; users  with view permissions:Set(root); groups 
with view permissions: Set(); users  with modify permissions: Set(root); groups 
with modify permissions: Set()
2018-04-12 13:09:12 INFO  ExecutorRunner:54 - Launch command: 
"/usr/java/jdk1.8.0_144/bin/java" "-cp" 
"/opt/spark-2.3.0-bin-hadoop2.7/conf/:/opt/spark-2.3.0-bin-hadoop2.7/jars/*" 
"-Xmx1024M" "-Dspark.driver.port=39928" 
"org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" 
"spark://CoarseGrainedScheduler@IP:39928" "--executor-id" "0" "--hostname" 
"192.**.**.**" "--cores" "4" "--app-id" "app-20180412130914-" 
"--worker-url" "spark://Worker@192.**.**.**:44986"
2018-04-12 13:11:13 INFO  Worker:54 - Executor app-20180412130914-/0 
finished with state EXITED message Command exited with code 1 exitStatus 1
2018-04-12 13:11:13 INFO  Worker:54 - Asked to launch executor 
app-20180412130914-/1 for Spark shell
2018-04-12 13:11:13 INFO  SecurityManager:54 - Changing view acls to: root
2018-04-12 13:11:13 INFO  SecurityManager:54 - Changing modify acls to: root
2018-04-12 13:11:13 INFO  SecurityManager:54 - Changing view acls groups to: 
2018-04-12 13:11:13 INFO  SecurityManager:54 - Changing modify acls groups to: 
2018-04-12 13:11:13 INFO  SecurityManager:54 - SecurityManager: authentication 
disabled; ui acls disabled; users  with view permissions: Set(root); groups 
with view permissions: Set(); users  with modify permissions: Set(root); groups 
with modify permissions: Set()
2018-04-12 13:11:13 INFO  ExecutorRunner:54 - Launch command: 
"/usr/java/jdk1.8.0_144/bin/java" "-cp" 
"/opt/spark-2.3.0-bin-hadoop2.7/conf/:/opt/spark-2.3.0-bin-hadoop2.7/jars/*" 
"-Xmx1024M" "-Dspark.driver.port=39928" 
"org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" 
"spark://CoarseGrainedScheduler@spark-master.novalocal:39928" "--executor-id" 
"1" "--hostname" "192.**.**.**" "--cores" "4" "--app-id" 

[jira] [Resolved] (SPARK-21088) CrossValidator, TrainValidationSplit should collect all models when fitting: Python API

2018-04-16 Thread Joseph K. Bradley (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley resolved SPARK-21088.
---
   Resolution: Fixed
Fix Version/s: 2.4.0

Issue resolved by pull request 19627
[https://github.com/apache/spark/pull/19627]

> CrossValidator, TrainValidationSplit should collect all models when fitting: 
> Python API
> ---
>
> Key: SPARK-21088
> URL: https://issues.apache.org/jira/browse/SPARK-21088
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML, PySpark
>Affects Versions: 2.2.0
>Reporter: Joseph K. Bradley
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 2.4.0
>
>
> In pyspark:
> We add a parameter whether to collect the full model list when 
> CrossValidator/TrainValidationSplit training (Default is NOT, avoid the 
> change cause OOM)
> Add a method in CrossValidatorModel/TrainValidationSplitModel, allow user to 
> get the model list
> CrossValidatorModelWriter add a “option”, allow user to control whether to 
> persist the model list to disk.
> Note: when persisting the model list, use indices as the sub-model path



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21088) CrossValidator, TrainValidationSplit should collect all models when fitting: Python API

2018-04-16 Thread Joseph K. Bradley (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley reassigned SPARK-21088:
-

Assignee: Weichen Xu

> CrossValidator, TrainValidationSplit should collect all models when fitting: 
> Python API
> ---
>
> Key: SPARK-21088
> URL: https://issues.apache.org/jira/browse/SPARK-21088
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML, PySpark
>Affects Versions: 2.2.0
>Reporter: Joseph K. Bradley
>Assignee: Weichen Xu
>Priority: Major
>
> In pyspark:
> We add a parameter whether to collect the full model list when 
> CrossValidator/TrainValidationSplit training (Default is NOT, avoid the 
> change cause OOM)
> Add a method in CrossValidatorModel/TrainValidationSplitModel, allow user to 
> get the model list
> CrossValidatorModelWriter add a “option”, allow user to control whether to 
> persist the model list to disk.
> Note: when persisting the model list, use indices as the sub-model path



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-9312) The OneVsRest model does not provide rawPrediction

2018-04-16 Thread Joseph K. Bradley (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley updated SPARK-9312:
-
Summary: The OneVsRest model does not provide rawPrediction  (was: The 
OneVsRest model does not provide confidence factor(not probability) along with 
the prediction)

> The OneVsRest model does not provide rawPrediction
> --
>
> Key: SPARK-9312
> URL: https://issues.apache.org/jira/browse/SPARK-9312
> Project: Spark
>  Issue Type: Improvement
>  Components: ML, MLlib
>Affects Versions: 1.4.0, 1.4.1
>Reporter: Badari Madhav
>Assignee: Lu Wang
>Priority: Major
>  Labels: features
> Fix For: 2.4.0
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-9312) The OneVsRest model does not provide confidence factor(not probability) along with the prediction

2018-04-16 Thread Joseph K. Bradley (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley reassigned SPARK-9312:


Assignee: Lu Wang

> The OneVsRest model does not provide confidence factor(not probability) along 
> with the prediction
> -
>
> Key: SPARK-9312
> URL: https://issues.apache.org/jira/browse/SPARK-9312
> Project: Spark
>  Issue Type: Improvement
>  Components: ML, MLlib
>Affects Versions: 1.4.0, 1.4.1
>Reporter: Badari Madhav
>Assignee: Lu Wang
>Priority: Major
>  Labels: features
> Fix For: 2.4.0
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-9312) The OneVsRest model does not provide confidence factor(not probability) along with the prediction

2018-04-16 Thread Joseph K. Bradley (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley resolved SPARK-9312.
--
   Resolution: Fixed
Fix Version/s: 2.4.0

Issue resolved by pull request 21044
[https://github.com/apache/spark/pull/21044]

> The OneVsRest model does not provide confidence factor(not probability) along 
> with the prediction
> -
>
> Key: SPARK-9312
> URL: https://issues.apache.org/jira/browse/SPARK-9312
> Project: Spark
>  Issue Type: Improvement
>  Components: ML, MLlib
>Affects Versions: 1.4.0, 1.4.1
>Reporter: Badari Madhav
>Assignee: Lu Wang
>Priority: Major
>  Labels: features
> Fix For: 2.4.0
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23981) ShuffleBlockFetcherIterator - Spamming Logs

2018-04-16 Thread BELUGA BEHR (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439614#comment-16439614
 ] 

BELUGA BEHR commented on SPARK-23981:
-

Or maybe lower per-block logging and debug and produce one over-all logging 
message if fetches cannot be completed.

> ShuffleBlockFetcherIterator - Spamming Logs
> ---
>
> Key: SPARK-23981
> URL: https://issues.apache.org/jira/browse/SPARK-23981
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.3.0
>Reporter: BELUGA BEHR
>Priority: Major
>
> If a remote host shuffle service fails, Spark Executors produce a huge amount 
> of logging.
> {code:java}
> 2018-04-10 20:24:44,834 INFO  [Block Fetch Retry-1] 
> shuffle.RetryingBlockFetcher (RetryingBlockFetcher.java:initiateRetry(163)) - 
> Retrying fetch (3/3) for 1753 outstanding blocks after 5000 ms
> 2018-04-10 20:24:49,865 ERROR [Block Fetch Retry-1] 
> storage.ShuffleBlockFetcherIterator (Logging.scala:logError(95)) - Failed to 
> get block(s) from myhost.local:7337
> java.io.IOException: Failed to connect to myhost.local/10.11.12.13:7337
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.net.ConnectException: Connection refused: 
> myhost.local/12.13.14.15:7337
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>   at 
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
>   at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>   ... 1 more
> {code}
>  
> We can see from the code, that if a block fetch fails, a "listener" is 
> updated once for each block. From the error messages previously, it can be 
> seen that 1753 blocks were being fetched. However, since the remote host has 
> become unavailable, they all fail and every block is alerted on.
>  
> {code:java|title=RetryingBlockFetcher.java}
>   if (shouldRetry(e)) {
> initiateRetry();
>   } else {
> for (String bid : blockIdsToFetch) {
>   listener.onBlockFetchFailure(bid, e);
> }
>   }
> {code}
> {code:java|title=ShuffleBlockFetcherIterator.scala}
> override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
> logError(s"Failed to get block(s) from 
> ${req.address.host}:${req.address.port}", e)
> results.put(new FailureFetchResult(BlockId(blockId), address, e))
>   }
> {code}
> So what we get here, is 1753 ERROR stack traces in the logging all printing 
> the same message:
> {quote}Failed to get block(s) from myhost.local:7337
>  ...
> {quote}
> Perhaps it would be better if the method signature {{onBlockFetchFailure}} 
> was changed to accept an entire Collection of block IDs instead of one-by-one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23994) Add Host To Blacklist If Shuffle Cannot Complete

2018-04-16 Thread BELUGA BEHR (JIRA)
BELUGA BEHR created SPARK-23994:
---

 Summary: Add Host To Blacklist If Shuffle Cannot Complete
 Key: SPARK-23994
 URL: https://issues.apache.org/jira/browse/SPARK-23994
 Project: Spark
  Issue Type: Improvement
  Components: Block Manager, Shuffle
Affects Versions: 2.3.0
Reporter: BELUGA BEHR


If a node cannot be reached for shuffling data, add the node to the blacklist 
and retry the current stage.

{code:java}
2018-04-10 20:25:55,065 ERROR [Block Fetch Retry-3] 
shuffle.RetryingBlockFetcher 
(RetryingBlockFetcher.java:fetchAllOutstanding(142)) - Exception while 
beginning fetch of 711 outstanding blocks (after 3 retries)
java.io.IOException: Failed to connect to host.local/10.11.12.13:7337
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
at 
org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused: 
host.local/10.11.12.13:7337
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23986) CompileException when using too many avg aggregation after joining

2018-04-16 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23986:


Assignee: Apache Spark

> CompileException when using too many avg aggregation after joining
> --
>
> Key: SPARK-23986
> URL: https://issues.apache.org/jira/browse/SPARK-23986
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Michel Davit
>Assignee: Apache Spark
>Priority: Major
> Attachments: spark-generated.java
>
>
> Considering the following code:
> {code:java}
> val df1: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
>   .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")
> val df2: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, "val1", "val2")))
>   .toDF("key", "dummy1", "dummy2")
> val agg = df1
>   .join(df2, df1("key") === df2("key"), "leftouter")
>   .groupBy(df1("key"))
>   .agg(
> avg("col2").as("avg2"),
> avg("col3").as("avg3"),
> avg("col4").as("avg4"),
> avg("col1").as("avg1"),
> avg("col5").as("avg5"),
> avg("col6").as("avg6")
>   )
> val head = agg.take(1)
> {code}
> This logs the following exception:
> {code:java}
> ERROR CodeGenerator: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 467, Column 28: Redefinition of parameter "agg_expr_11"
> {code}
> I am not a spark expert but after investigation, I realized that the 
> generated {{doConsume}} method is responsible of the exception.
> Indeed, {{avg}} calls several times 
> {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. 
> The 1st time with the 'avg' Expr and a second time for the base aggregation 
> Expr (count and sum).
> The problem comes from the generation of parameters in CodeGenerator:
> {code:java}
>   /**
>* Returns a term name that is unique within this instance of a 
> `CodegenContext`.
>*/
>   def freshName(name: String): String = synchronized {
> val fullName = if (freshNamePrefix == "") {
>   name
> } else {
>   s"${freshNamePrefix}_$name"
> }
> if (freshNameIds.contains(fullName)) {
>   val id = freshNameIds(fullName)
>   freshNameIds(fullName) = id + 1
>   s"$fullName$id"
> } else {
>   freshNameIds += fullName -> 1
>   fullName
> }
>   }
> {code}
> The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call.
>  The second call is made with {{agg_expr_[1..12]}} and generates the 
> following names:
>  {{agg_expr_[11|21|31|41|51|61|11|12]}}. We then have a parameter name 
> conflicts in the generated code: {{agg_expr_11.}}
> Appending the 'id' in s"$fullName$id" to generate unique term name is source 
> of conflict. Maybe simply using undersoce can solve this issue : 
> $fullName_$id"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23986) CompileException when using too many avg aggregation after joining

2018-04-16 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23986:


Assignee: (was: Apache Spark)

> CompileException when using too many avg aggregation after joining
> --
>
> Key: SPARK-23986
> URL: https://issues.apache.org/jira/browse/SPARK-23986
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Michel Davit
>Priority: Major
> Attachments: spark-generated.java
>
>
> Considering the following code:
> {code:java}
> val df1: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
>   .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")
> val df2: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, "val1", "val2")))
>   .toDF("key", "dummy1", "dummy2")
> val agg = df1
>   .join(df2, df1("key") === df2("key"), "leftouter")
>   .groupBy(df1("key"))
>   .agg(
> avg("col2").as("avg2"),
> avg("col3").as("avg3"),
> avg("col4").as("avg4"),
> avg("col1").as("avg1"),
> avg("col5").as("avg5"),
> avg("col6").as("avg6")
>   )
> val head = agg.take(1)
> {code}
> This logs the following exception:
> {code:java}
> ERROR CodeGenerator: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 467, Column 28: Redefinition of parameter "agg_expr_11"
> {code}
> I am not a spark expert but after investigation, I realized that the 
> generated {{doConsume}} method is responsible of the exception.
> Indeed, {{avg}} calls several times 
> {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. 
> The 1st time with the 'avg' Expr and a second time for the base aggregation 
> Expr (count and sum).
> The problem comes from the generation of parameters in CodeGenerator:
> {code:java}
>   /**
>* Returns a term name that is unique within this instance of a 
> `CodegenContext`.
>*/
>   def freshName(name: String): String = synchronized {
> val fullName = if (freshNamePrefix == "") {
>   name
> } else {
>   s"${freshNamePrefix}_$name"
> }
> if (freshNameIds.contains(fullName)) {
>   val id = freshNameIds(fullName)
>   freshNameIds(fullName) = id + 1
>   s"$fullName$id"
> } else {
>   freshNameIds += fullName -> 1
>   fullName
> }
>   }
> {code}
> The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call.
>  The second call is made with {{agg_expr_[1..12]}} and generates the 
> following names:
>  {{agg_expr_[11|21|31|41|51|61|11|12]}}. We then have a parameter name 
> conflicts in the generated code: {{agg_expr_11.}}
> Appending the 'id' in s"$fullName$id" to generate unique term name is source 
> of conflict. Maybe simply using undersoce can solve this issue : 
> $fullName_$id"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23986) CompileException when using too many avg aggregation after joining

2018-04-16 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439417#comment-16439417
 ] 

Apache Spark commented on SPARK-23986:
--

User 'mgaido91' has created a pull request for this issue:
https://github.com/apache/spark/pull/21080

> CompileException when using too many avg aggregation after joining
> --
>
> Key: SPARK-23986
> URL: https://issues.apache.org/jira/browse/SPARK-23986
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Michel Davit
>Priority: Major
> Attachments: spark-generated.java
>
>
> Considering the following code:
> {code:java}
> val df1: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
>   .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")
> val df2: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, "val1", "val2")))
>   .toDF("key", "dummy1", "dummy2")
> val agg = df1
>   .join(df2, df1("key") === df2("key"), "leftouter")
>   .groupBy(df1("key"))
>   .agg(
> avg("col2").as("avg2"),
> avg("col3").as("avg3"),
> avg("col4").as("avg4"),
> avg("col1").as("avg1"),
> avg("col5").as("avg5"),
> avg("col6").as("avg6")
>   )
> val head = agg.take(1)
> {code}
> This logs the following exception:
> {code:java}
> ERROR CodeGenerator: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 467, Column 28: Redefinition of parameter "agg_expr_11"
> {code}
> I am not a spark expert but after investigation, I realized that the 
> generated {{doConsume}} method is responsible of the exception.
> Indeed, {{avg}} calls several times 
> {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. 
> The 1st time with the 'avg' Expr and a second time for the base aggregation 
> Expr (count and sum).
> The problem comes from the generation of parameters in CodeGenerator:
> {code:java}
>   /**
>* Returns a term name that is unique within this instance of a 
> `CodegenContext`.
>*/
>   def freshName(name: String): String = synchronized {
> val fullName = if (freshNamePrefix == "") {
>   name
> } else {
>   s"${freshNamePrefix}_$name"
> }
> if (freshNameIds.contains(fullName)) {
>   val id = freshNameIds(fullName)
>   freshNameIds(fullName) = id + 1
>   s"$fullName$id"
> } else {
>   freshNameIds += fullName -> 1
>   fullName
> }
>   }
> {code}
> The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call.
>  The second call is made with {{agg_expr_[1..12]}} and generates the 
> following names:
>  {{agg_expr_[11|21|31|41|51|61|11|12]}}. We then have a parameter name 
> conflicts in the generated code: {{agg_expr_11.}}
> Appending the 'id' in s"$fullName$id" to generate unique term name is source 
> of conflict. Maybe simply using undersoce can solve this issue : 
> $fullName_$id"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23986) CompileException when using too many avg aggregation after joining

2018-04-16 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439398#comment-16439398
 ] 

Marco Gaido commented on SPARK-23986:
-

[~RustedBones] I was able to reproduce. Yes, I do agree with you in all your 
analysis and also with your proposal of solution. I am submitting a patch. 
Thanks for reporting this.

> CompileException when using too many avg aggregation after joining
> --
>
> Key: SPARK-23986
> URL: https://issues.apache.org/jira/browse/SPARK-23986
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Michel Davit
>Priority: Major
> Attachments: spark-generated.java
>
>
> Considering the following code:
> {code:java}
> val df1: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
>   .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")
> val df2: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, "val1", "val2")))
>   .toDF("key", "dummy1", "dummy2")
> val agg = df1
>   .join(df2, df1("key") === df2("key"), "leftouter")
>   .groupBy(df1("key"))
>   .agg(
> avg("col2").as("avg2"),
> avg("col3").as("avg3"),
> avg("col4").as("avg4"),
> avg("col1").as("avg1"),
> avg("col5").as("avg5"),
> avg("col6").as("avg6")
>   )
> val head = agg.take(1)
> {code}
> This logs the following exception:
> {code:java}
> ERROR CodeGenerator: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 467, Column 28: Redefinition of parameter "agg_expr_11"
> {code}
> I am not a spark expert but after investigation, I realized that the 
> generated {{doConsume}} method is responsible of the exception.
> Indeed, {{avg}} calls several times 
> {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. 
> The 1st time with the 'avg' Expr and a second time for the base aggregation 
> Expr (count and sum).
> The problem comes from the generation of parameters in CodeGenerator:
> {code:java}
>   /**
>* Returns a term name that is unique within this instance of a 
> `CodegenContext`.
>*/
>   def freshName(name: String): String = synchronized {
> val fullName = if (freshNamePrefix == "") {
>   name
> } else {
>   s"${freshNamePrefix}_$name"
> }
> if (freshNameIds.contains(fullName)) {
>   val id = freshNameIds(fullName)
>   freshNameIds(fullName) = id + 1
>   s"$fullName$id"
> } else {
>   freshNameIds += fullName -> 1
>   fullName
> }
>   }
> {code}
> The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call.
>  The second call is made with {{agg_expr_[1..12]}} and generates the 
> following names:
>  {{agg_expr_[11|21|31|41|51|61|11|12]}}. We then have a parameter name 
> conflicts in the generated code: {{agg_expr_11.}}
> Appending the 'id' in s"$fullName$id" to generate unique term name is source 
> of conflict. Maybe simply using undersoce can solve this issue : 
> $fullName_$id"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten

2018-04-16 Thread Wenchen Fan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439384#comment-16439384
 ] 

Wenchen Fan commented on SPARK-23989:
-

do you have an end-to-end case to show this bug? IIRC we always copy the unsafe 
row before sending it to something like `SortShuffleWriter`.

> When using `SortShuffleWriter`, the data will be overwritten
> 
>
> Key: SPARK-23989
> URL: https://issues.apache.org/jira/browse/SPARK-23989
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: liuxian
>Priority: Critical
>
> {color:#33}When using `SortShuffleWriter`, we only insert  
> '{color}{color:#cc7832}AnyRef{color}{color:#33}' into 
> '{color}PartitionedAppendOnlyMap{color:#33}' or 
> '{color}PartitionedPairBuffer{color:#33}'.{color}
> {color:#33}For this function:{color}
> {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: 
> {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832},
>  {color}{color:#4e807d}V{color}]])
> the value of 'records' is `UnsafeRow`, so  the value will be overwritten
> {color:#33} {color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23993) Support DESC FORMATTED table_name column_name

2018-04-16 Thread Volodymyr Glushak (JIRA)
Volodymyr Glushak created SPARK-23993:
-

 Summary: Support DESC FORMATTED table_name column_name
 Key: SPARK-23993
 URL: https://issues.apache.org/jira/browse/SPARK-23993
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.1.2
Reporter: Volodymyr Glushak


Hive and Spark both supports:
{code}
DESC FORMATTED table_name{code}
which gives table metadata.
If you want to get metadata for particular column in hive you can execute:
{code}
DESC FORMATTED table_name column_name{code}
Thos is not supported in Spark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23901) Data Masking Functions

2018-04-16 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437401#comment-16437401
 ] 

Marco Gaido edited comment on SPARK-23901 at 4/16/18 12:19 PM:
---

Actually I am facing some issues in the implementation. I have a couple of 
questions:
 1 - In the mask function Hive accepts only constant values as parameters 
(other than the main string to replace). Shall we enforce this in Spark too?
 2 - Despite in the documentation these methods are said to accept strings as 
parameters, actually they allow basically any type and any type is considered 
differently. Shall we reproduce the same Hive behavior or shall we support only 
String?
 3 - Moreover, and this is connected with point 2, Hive accepts many more 
parameters than the ones in the documentation, shall we support them too?



was (Author: mgaido):
Actually I am facing some issues in the implementation. I have a couple of 
questions:
 1 - In the mask function Hive accepts only constant values as parameters 
(other than the main string to replace). Shall we enforce this in Spark too?
 2 - Despite in the documentation these methods are said to accept strings as 
parameters, actually they allow basically any type and any type is considered 
differently. Shall we reproduce the same Hive behavior or shall we support only 
String?


> Data Masking Functions
> --
>
> Key: SPARK-23901
> URL: https://issues.apache.org/jira/browse/SPARK-23901
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Priority: Major
>
> - mask()
>  - mask_first_n()
>  - mask_last_n()
>  - mask_hash()
>  - mask_show_first_n()
>  - mask_show_last_n()
> Reference:
> [1] 
> [https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DataMaskingFunctions]
> [2] https://issues.apache.org/jira/browse/HIVE-13568
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23901) Data Masking Functions

2018-04-16 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439354#comment-16439354
 ] 

Marco Gaido commented on SPARK-23901:
-

[~ueshin] maybe you have some inputs on my questions above.

> Data Masking Functions
> --
>
> Key: SPARK-23901
> URL: https://issues.apache.org/jira/browse/SPARK-23901
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Priority: Major
>
> - mask()
>  - mask_first_n()
>  - mask_last_n()
>  - mask_hash()
>  - mask_show_first_n()
>  - mask_show_last_n()
> Reference:
> [1] 
> [https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DataMaskingFunctions]
> [2] https://issues.apache.org/jira/browse/HIVE-13568
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23992) ShuffleDependency does not need to be deserialized every time

2018-04-16 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439349#comment-16439349
 ] 

Apache Spark commented on SPARK-23992:
--

User '10110346' has created a pull request for this issue:
https://github.com/apache/spark/pull/21079

> ShuffleDependency does not need to be deserialized every time
> -
>
> Key: SPARK-23992
> URL: https://issues.apache.org/jira/browse/SPARK-23992
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: liuxian
>Priority: Minor
>
> In the same stage, 'ShuffleDependency' is not necessary to be deserialized 
> each time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23992) ShuffleDependency does not need to be deserialized every time

2018-04-16 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23992:


Assignee: Apache Spark

> ShuffleDependency does not need to be deserialized every time
> -
>
> Key: SPARK-23992
> URL: https://issues.apache.org/jira/browse/SPARK-23992
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: liuxian
>Assignee: Apache Spark
>Priority: Minor
>
> In the same stage, 'ShuffleDependency' is not necessary to be deserialized 
> each time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23992) ShuffleDependency does not need to be deserialized every time

2018-04-16 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23992:


Assignee: (was: Apache Spark)

> ShuffleDependency does not need to be deserialized every time
> -
>
> Key: SPARK-23992
> URL: https://issues.apache.org/jira/browse/SPARK-23992
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: liuxian
>Priority: Minor
>
> In the same stage, 'ShuffleDependency' is not necessary to be deserialized 
> each time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23992) ShuffleDependency does not need to be deserialized every time

2018-04-16 Thread liuxian (JIRA)
liuxian created SPARK-23992:
---

 Summary: ShuffleDependency does not need to be deserialized every 
time
 Key: SPARK-23992
 URL: https://issues.apache.org/jira/browse/SPARK-23992
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: liuxian


In the same stage, 'ShuffleDependency' is not necessary to be deserialized each 
time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23986) CompileException when using too many avg aggregation after joining

2018-04-16 Thread Michel Davit (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michel Davit updated SPARK-23986:
-
Description: 
Considering the following code:
{code:java}
val df1: DataFrame = sparkSession.sparkContext
  .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
  .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")

val df2: DataFrame = sparkSession.sparkContext
  .makeRDD(Seq((0, "val1", "val2")))
  .toDF("key", "dummy1", "dummy2")

val agg = df1
  .join(df2, df1("key") === df2("key"), "leftouter")
  .groupBy(df1("key"))
  .agg(
avg("col2").as("avg2"),
avg("col3").as("avg3"),
avg("col4").as("avg4"),
avg("col1").as("avg1"),
avg("col5").as("avg5"),
avg("col6").as("avg6")
  )

val head = agg.take(1)
{code}
This logs the following exception:
{code:java}
ERROR CodeGenerator: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
467, Column 28: Redefinition of parameter "agg_expr_11"
{code}
I am not a spark expert but after investigation, I realized that the generated 
{{doConsume}} method is responsible of the exception.

Indeed, {{avg}} calls several times 
{{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. 
The 1st time with the 'avg' Expr and a second time for the base aggregation 
Expr (count and sum).

The problem comes from the generation of parameters in CodeGenerator:
{code:java}
  /**
   * Returns a term name that is unique within this instance of a 
`CodegenContext`.
   */
  def freshName(name: String): String = synchronized {
val fullName = if (freshNamePrefix == "") {
  name
} else {
  s"${freshNamePrefix}_$name"
}
if (freshNameIds.contains(fullName)) {
  val id = freshNameIds(fullName)
  freshNameIds(fullName) = id + 1
  s"$fullName$id"
} else {
  freshNameIds += fullName -> 1
  fullName
}
  }
{code}
The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call.
 The second call is made with {{agg_expr_[1..12]}} and generates the following 
names:
 {{agg_expr_[11|21|31|41|51|61|11|12]}}. We then have a parameter name 
conflicts in the generated code: {{agg_expr_11.}}

Appending the 'id' in s"$fullName$id" to generate unique term name is source of 
conflict. Maybe simply using undersoce can solve this issue : $fullName_$id"

  was:
Considering the following code:
{code:java}
val df1: DataFrame = sparkSession.sparkContext
  .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
  .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")

val df2: DataFrame = sparkSession.sparkContext
  .makeRDD(Seq((0, "val1", "val2")))
  .toDF("key", "dummy1", "dummy2")

val agg = df1
  .join(df2, df1("key") === df2("key"), "leftouter")
  .groupBy(df1("key"))
  .agg(
avg("col2").as("avg2"),
avg("col3").as("avg3"),
avg("col4").as("avg4"),
avg("col1").as("avg1"),
avg("col5").as("avg5"),
avg("col6").as("avg6")
  )

val head = agg.take(1)
{code}
This logs the following exception:
{code:java}
ERROR CodeGenerator: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
467, Column 28: Redefinition of parameter "agg_expr_11"
{code}
I am not a spark expert but after investigation, I realized that the generated 
{{doConsume}} method is responsible of the exception.

Indeed, {{avg}} calls several times 
{{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. 
The 1st time with the 'avg' Expr and a second time for the base aggregation 
Expr (count and sum).

The problem comes from the generation of parameters in CodeGenerator:
{code:java}
  /**
   * Returns a term name that is unique within this instance of a 
`CodegenContext`.
   */
  def freshName(name: String): String = synchronized {
val fullName = if (freshNamePrefix == "") {
  name
} else {
  s"${freshNamePrefix}_$name"
}
if (freshNameIds.contains(fullName)) {
  val id = freshNameIds(fullName)
  freshNameIds(fullName) = id + 1
  s"$fullName$id"
} else {
  freshNameIds += fullName -> 1
  fullName
}
  }
{code}
The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call.
 The second call is made with {{agg_expr_[1..12]}} and generates the following 
names:
 {{agg_expr_[11|21|31|41|51|61|11|12}}. We then have a parameter name conflicts 
in the generated code: {{agg_expr_11.}}

Appending the 'id' in s"$fullName$id" to generate unique term name is source of 
conflict. Maybe simply using undersoce can solve this issue : $fullName_$id"


> CompileException when using too many avg aggregation after joining
> --
>
> Key: SPARK-23986
> URL: 

[jira] [Updated] (SPARK-23986) CompileException when using too many avg aggregation after joining

2018-04-16 Thread Michel Davit (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michel Davit updated SPARK-23986:
-
Description: 
Considering the following code:
{code:java}
val df1: DataFrame = sparkSession.sparkContext
  .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
  .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")

val df2: DataFrame = sparkSession.sparkContext
  .makeRDD(Seq((0, "val1", "val2")))
  .toDF("key", "dummy1", "dummy2")

val agg = df1
  .join(df2, df1("key") === df2("key"), "leftouter")
  .groupBy(df1("key"))
  .agg(
avg("col2").as("avg2"),
avg("col3").as("avg3"),
avg("col4").as("avg4"),
avg("col1").as("avg1"),
avg("col5").as("avg5"),
avg("col6").as("avg6")
  )

val head = agg.take(1)
{code}
This logs the following exception:
{code:java}
ERROR CodeGenerator: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
467, Column 28: Redefinition of parameter "agg_expr_11"
{code}
I am not a spark expert but after investigation, I realized that the generated 
{{doConsume}} method is responsible of the exception.

Indeed, {{avg}} calls several times 
{{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. 
The 1st time with the 'avg' Expr and a second time for the base aggregation 
Expr (count and sum).

The problem comes from the generation of parameters in CodeGenerator:
{code:java}
  /**
   * Returns a term name that is unique within this instance of a 
`CodegenContext`.
   */
  def freshName(name: String): String = synchronized {
val fullName = if (freshNamePrefix == "") {
  name
} else {
  s"${freshNamePrefix}_$name"
}
if (freshNameIds.contains(fullName)) {
  val id = freshNameIds(fullName)
  freshNameIds(fullName) = id + 1
  s"$fullName$id"
} else {
  freshNameIds += fullName -> 1
  fullName
}
  }
{code}
The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call.
 The second call is made with {{agg_expr_[1..12]}} and generates the following 
names:
 {{agg_expr_[11|21|31|41|51|61|11|12}}. We then have a parameter name conflicts 
in the generated code: {{agg_expr_11.}}

Appending the 'id' in s"$fullName$id" to generate unique term name is source of 
conflict. Maybe simply using undersoce can solve this issue : $fullName_$id"

  was:
Considering the following code:
{code:java}
val df1: DataFrame = sparkSession.sparkContext
  .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
  .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")

val df2: DataFrame = sparkSession.sparkContext
  .makeRDD(Seq((0, "val1", "val2")))
  .toDF("key", "dummy1", "dummy2")

val agg = df1
  .join(df2, df1("key") === df2("key"), "leftouter")
  .groupBy(df1("key"))
  .agg(
avg("col2").as("avg2"),
avg("col3").as("avg3"),
avg("col4").as("avg4"),
avg("col1").as("avg1"),
avg("col5").as("avg5"),
avg("col6").as("avg6")
  )

val head = agg.take(1)
{code}
This logs the following exception:
{code:java}
ERROR CodeGenerator: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
467, Column 28: Redefinition of parameter "agg_expr_11"
{code}
I am not a spark expert but after investigation, I realized that the generated 
{{doConsume}} method is responsible of the exception.

Indeed, {{avg}} calls several times 
{{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. 
The 1st time with the 'avg' Expr and a second time for the base aggregation 
Expr (count and sum).

The problem comes from the generation of parameters in CodeGenerator:
{code:java}
  /**
   * Returns a term name that is unique within this instance of a 
`CodegenContext`.
   */
  def freshName(name: String): String = synchronized {
val fullName = if (freshNamePrefix == "") {
  name
} else {
  s"${freshNamePrefix}_$name"
}
if (freshNameIds.contains(fullName)) {
  val id = freshNameIds(fullName)
  freshNameIds(fullName) = id + 1
  s"$fullName$id"
} else {
  freshNameIds += fullName -> 1
  fullName
}
  }
{code}
The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call.
 The second call is made with {{agg_expr_[1..12]}} and generates the following 
names:
 {{agg_expr_[11|21|31|41|51|61|11|12}}. We then have 2 parameter name conflicts 
in the generated code: {{agg_expr_11}} and {{agg_expr_12}}.

Appending the 'id' in s"$fullName$id" to generate unique term name is source of 
conflict. Maybe simply using undersoce can solve this issue : $fullName_$id"


> CompileException when using too many avg aggregation after joining
> --
>
> Key: SPARK-23986
> URL: 

[jira] [Commented] (SPARK-23986) CompileException when using too many avg aggregation after joining

2018-04-16 Thread Michel Davit (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439318#comment-16439318
 ] 

Michel Davit commented on SPARK-23986:
--

I tested on Spark v2.3.0
I attached the generated code: [^spark-generated.java] . Here is the faulty  
line (467):
{code:java}
private void agg_doConsume1(int agg_expr_01, double agg_expr_11, boolean 
agg_exprIsNull_1, long agg_expr_21, boolean agg_exprIsNull_2, double 
agg_expr_31, boolean agg_exprIsNull_3, long agg_expr_41, boolean 
agg_exprIsNull_4, double agg_expr_51, boolean agg_exprIsNull_5, long 
agg_expr_61, boolean agg_exprIsNull_6, double agg_expr_7, boolean 
agg_exprIsNull_7, long agg_expr_8, boolean agg_exprIsNull_8, double agg_expr_9, 
boolean agg_exprIsNull_9, long agg_expr_10, boolean agg_exprIsNull_10, double 
agg_expr_11, boolean agg_exprIsNull_11, long agg_expr_12, boolean 
agg_exprIsNull_12) throws java.io.IOException
{code}
Maybe a precision: the code does not throw, it just logs an error. I also 
checked the computed average values, everything seems correct.

> CompileException when using too many avg aggregation after joining
> --
>
> Key: SPARK-23986
> URL: https://issues.apache.org/jira/browse/SPARK-23986
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Michel Davit
>Priority: Major
> Attachments: spark-generated.java
>
>
> Considering the following code:
> {code:java}
> val df1: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
>   .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")
> val df2: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, "val1", "val2")))
>   .toDF("key", "dummy1", "dummy2")
> val agg = df1
>   .join(df2, df1("key") === df2("key"), "leftouter")
>   .groupBy(df1("key"))
>   .agg(
> avg("col2").as("avg2"),
> avg("col3").as("avg3"),
> avg("col4").as("avg4"),
> avg("col1").as("avg1"),
> avg("col5").as("avg5"),
> avg("col6").as("avg6")
>   )
> val head = agg.take(1)
> {code}
> This logs the following exception:
> {code:java}
> ERROR CodeGenerator: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 467, Column 28: Redefinition of parameter "agg_expr_11"
> {code}
> I am not a spark expert but after investigation, I realized that the 
> generated {{doConsume}} method is responsible of the exception.
> Indeed, {{avg}} calls several times 
> {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. 
> The 1st time with the 'avg' Expr and a second time for the base aggregation 
> Expr (count and sum).
> The problem comes from the generation of parameters in CodeGenerator:
> {code:java}
>   /**
>* Returns a term name that is unique within this instance of a 
> `CodegenContext`.
>*/
>   def freshName(name: String): String = synchronized {
> val fullName = if (freshNamePrefix == "") {
>   name
> } else {
>   s"${freshNamePrefix}_$name"
> }
> if (freshNameIds.contains(fullName)) {
>   val id = freshNameIds(fullName)
>   freshNameIds(fullName) = id + 1
>   s"$fullName$id"
> } else {
>   freshNameIds += fullName -> 1
>   fullName
> }
>   }
> {code}
> The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call.
>  The second call is made with {{agg_expr_[1..12]}} and generates the 
> following names:
>  {{agg_expr_[11|21|31|41|51|61|11|12}}. We then have 2 parameter name 
> conflicts in the generated code: {{agg_expr_11}} and {{agg_expr_12}}.
> Appending the 'id' in s"$fullName$id" to generate unique term name is source 
> of conflict. Maybe simply using undersoce can solve this issue : 
> $fullName_$id"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23986) CompileException when using too many avg aggregation after joining

2018-04-16 Thread Michel Davit (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michel Davit updated SPARK-23986:
-
Attachment: spark-generated.java

> CompileException when using too many avg aggregation after joining
> --
>
> Key: SPARK-23986
> URL: https://issues.apache.org/jira/browse/SPARK-23986
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Michel Davit
>Priority: Major
> Attachments: spark-generated.java
>
>
> Considering the following code:
> {code:java}
> val df1: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
>   .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")
> val df2: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, "val1", "val2")))
>   .toDF("key", "dummy1", "dummy2")
> val agg = df1
>   .join(df2, df1("key") === df2("key"), "leftouter")
>   .groupBy(df1("key"))
>   .agg(
> avg("col2").as("avg2"),
> avg("col3").as("avg3"),
> avg("col4").as("avg4"),
> avg("col1").as("avg1"),
> avg("col5").as("avg5"),
> avg("col6").as("avg6")
>   )
> val head = agg.take(1)
> {code}
> This logs the following exception:
> {code:java}
> ERROR CodeGenerator: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 467, Column 28: Redefinition of parameter "agg_expr_11"
> {code}
> I am not a spark expert but after investigation, I realized that the 
> generated {{doConsume}} method is responsible of the exception.
> Indeed, {{avg}} calls several times 
> {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. 
> The 1st time with the 'avg' Expr and a second time for the base aggregation 
> Expr (count and sum).
> The problem comes from the generation of parameters in CodeGenerator:
> {code:java}
>   /**
>* Returns a term name that is unique within this instance of a 
> `CodegenContext`.
>*/
>   def freshName(name: String): String = synchronized {
> val fullName = if (freshNamePrefix == "") {
>   name
> } else {
>   s"${freshNamePrefix}_$name"
> }
> if (freshNameIds.contains(fullName)) {
>   val id = freshNameIds(fullName)
>   freshNameIds(fullName) = id + 1
>   s"$fullName$id"
> } else {
>   freshNameIds += fullName -> 1
>   fullName
> }
>   }
> {code}
> The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call.
>  The second call is made with {{agg_expr_[1..12]}} and generates the 
> following names:
>  {{agg_expr_[11|21|31|41|51|61|11|12}}. We then have 2 parameter name 
> conflicts in the generated code: {{agg_expr_11}} and {{agg_expr_12}}.
> Appending the 'id' in s"$fullName$id" to generate unique term name is source 
> of conflict. Maybe simply using undersoce can solve this issue : 
> $fullName_$id"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23991) data loss when allocateBlocksToBatch

2018-04-16 Thread kevin fu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kevin fu updated SPARK-23991:
-
Description: 
with checkpoint and WAL enabled, driver will write the allocation of blocks to 
batch into hdfs. however, if it fails as following, the blocks of this batch 
cannot be computed by the DAG. Because the blocks have been dequeued from the 
receivedBlockQueue and get lost.
{panel:title=error log}
18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing 
record: BatchAllocationEvent(152376548 ms,AllocatedBlocks(Map(0 -> 
ArrayBuffer( to the WriteAheadLog. org.apache.spark.SparkException: 
Exception thrown in awaitResult: at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
 at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
 at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118)
 at 
org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
 at scala.util.Try$.apply(Try.scala:192) at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: 
java.util.concurrent.TimeoutException: Futures timed out after [5000 
milliseconds] at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at 
scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:190) at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 
more 18/04/15 11:11:25 INFO ReceivedBlockTracker: Possibly processed batch 
152376548 ms needs to be processed again in WAL recovery{panel}
the concerning codes are showed below:
{code}
  /**
   * Allocate all unallocated blocks to the given batch.
   * This event will get written to the write ahead log (if enabled).
   */
  def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
  val streamIdToBlocks = streamIds.map { streamId =>
  (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
  }.toMap
  val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
  if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
  } else {
logInfo(s"Possibly processed batch $batchTime needs to be processed 
again in WAL recovery")
  }
} else {
  // This situation occurs when:
  // 1. WAL is ended with BatchAllocationEvent, but without 
BatchCleanupEvent,
  // possibly processed batch job or half-processed batch job need to be 
processed again,
  // so the batchTime will be equal to lastAllocatedBatchTime.
  // 2. Slow checkpointing makes recovered batch time older than WAL 
recovered
  // lastAllocatedBatchTime.
  // This situation will only occurs in recovery time.
  logInfo(s"Possibly processed batch $batchTime needs to be processed again 
in WAL recovery")
}
  }

{code}

  was:
with checkpoint and WAL enabled, driver will write the allocation of blocks to 
batch into hdfs. however, if it fails as following, the blocks of this batch 
cannot be computed by the DAG. Because the blocks have been dequeued from the 
receivedBlockQueue and get lost.


{panel:title=error log}
18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing 
record: BatchAllocationEvent(152376548 ms,AllocatedBlocks(Map(0 -> 
ArrayBuffer( to the WriteAheadLog. org.apache.spark.SparkException: 
Exception thrown in awaitResult: at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
 at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
 at 

[jira] [Updated] (SPARK-23991) data loss when allocateBlocksToBatch

2018-04-16 Thread kevin fu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kevin fu updated SPARK-23991:
-
Description: 
with checkpoint and WAL enabled, driver will write the allocation of blocks to 
batch into hdfs. however, if it fails as following, the blocks of this batch 
cannot be computed by the DAG. Because the blocks have been dequeued from the 
receivedBlockQueue and get lost.


{panel:title=error log}
18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing 
record: BatchAllocationEvent(152376548 ms,AllocatedBlocks(Map(0 -> 
ArrayBuffer( to the WriteAheadLog. org.apache.spark.SparkException: 
Exception thrown in awaitResult: at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
 at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
 at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118)
 at 
org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
 at scala.util.Try$.apply(Try.scala:192) at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: 
java.util.concurrent.TimeoutException: Futures timed out after [5000 
milliseconds] at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at 
scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:190) at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 
more
{panel}


the concerning codes are showed below:


{code:scala}
  /**
   * Allocate all unallocated blocks to the given batch.
   * This event will get written to the write ahead log (if enabled).
   */
  def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
  val streamIdToBlocks = streamIds.map { streamId =>
  (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
  }.toMap
  val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
  if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
  } else {
logInfo(s"Possibly processed batch $batchTime needs to be processed 
again in WAL recovery")
  }
} else {
  // This situation occurs when:
  // 1. WAL is ended with BatchAllocationEvent, but without 
BatchCleanupEvent,
  // possibly processed batch job or half-processed batch job need to be 
processed again,
  // so the batchTime will be equal to lastAllocatedBatchTime.
  // 2. Slow checkpointing makes recovered batch time older than WAL 
recovered
  // lastAllocatedBatchTime.
  // This situation will only occurs in recovery time.
  logInfo(s"Possibly processed batch $batchTime needs to be processed again 
in WAL recovery")
}
  }

{code}


  was:
with checkpoint and WAL enabled, driver will write the allocation of blocks to 
batch into hdfs. however, if it fails as following, the blocks of this batch 
cannot be computed by the DAG. Because the blocks have been dequeued from the 
receivedBlockQueue and get lost.

{quote}18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while 
writing record: BatchAllocationEvent(152376548 ms,AllocatedBlocks(Map(0 -> 
ArrayBuffer( to the WriteAheadLog. org.apache.spark.SparkException: 
Exception thrown in awaitResult: at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
 at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
 at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118)
 at 

[jira] [Created] (SPARK-23991) data loss when allocateBlocksToBatch

2018-04-16 Thread kevin fu (JIRA)
kevin fu created SPARK-23991:


 Summary: data loss when allocateBlocksToBatch
 Key: SPARK-23991
 URL: https://issues.apache.org/jira/browse/SPARK-23991
 Project: Spark
  Issue Type: Bug
  Components: DStreams, Input/Output
Affects Versions: 2.2.0
 Environment: spark 2.11
Reporter: kevin fu


with checkpoint and WAL enabled, driver will write the allocation of blocks to 
batch into hdfs. however, if it fails as following, the blocks of this batch 
cannot be computed by the DAG. Because the blocks have been dequeued from the 
receivedBlockQueue and get lost.

{quote}18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while 
writing record: BatchAllocationEvent(152376548 ms,AllocatedBlocks(Map(0 -> 
ArrayBuffer( to the WriteAheadLog. org.apache.spark.SparkException: 
Exception thrown in awaitResult: at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
 at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
 at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118)
 at 
org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
 at scala.util.Try$.apply(Try.scala:192) at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: 
java.util.concurrent.TimeoutException: Futures timed out after [5000 
milliseconds] at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at 
scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:190) at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 
more{quote}

the concerning codes are showed below:

{quote}
  /**
   * Allocate all unallocated blocks to the given batch.
   * This event will get written to the write ahead log (if enabled).
   */
  def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
  val streamIdToBlocks = streamIds.map { streamId =>
  (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
  }.toMap
  val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
  if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
  } else {
logInfo(s"Possibly processed batch $batchTime needs to be processed 
again in WAL recovery")
  }
} else {
  // This situation occurs when:
  // 1. WAL is ended with BatchAllocationEvent, but without 
BatchCleanupEvent,
  // possibly processed batch job or half-processed batch job need to be 
processed again,
  // so the batchTime will be equal to lastAllocatedBatchTime.
  // 2. Slow checkpointing makes recovered batch time older than WAL 
recovered
  // lastAllocatedBatchTime.
  // This situation will only occurs in recovery time.
  logInfo(s"Possibly processed batch $batchTime needs to be processed again 
in WAL recovery")
}
  }
{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23990) Instruments logging improvements - ML regression package

2018-04-16 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23990:


Assignee: (was: Apache Spark)

> Instruments logging improvements - ML regression package
> 
>
> Key: SPARK-23990
> URL: https://issues.apache.org/jira/browse/SPARK-23990
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 2.3.0
> Environment: Instruments logging improvements - ML regression package
>Reporter: Weichen Xu
>Priority: Major
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23990) Instruments logging improvements - ML regression package

2018-04-16 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439268#comment-16439268
 ] 

Apache Spark commented on SPARK-23990:
--

User 'WeichenXu123' has created a pull request for this issue:
https://github.com/apache/spark/pull/21078

> Instruments logging improvements - ML regression package
> 
>
> Key: SPARK-23990
> URL: https://issues.apache.org/jira/browse/SPARK-23990
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 2.3.0
> Environment: Instruments logging improvements - ML regression package
>Reporter: Weichen Xu
>Priority: Major
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23990) Instruments logging improvements - ML regression package

2018-04-16 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23990:


Assignee: Apache Spark

> Instruments logging improvements - ML regression package
> 
>
> Key: SPARK-23990
> URL: https://issues.apache.org/jira/browse/SPARK-23990
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 2.3.0
> Environment: Instruments logging improvements - ML regression package
>Reporter: Weichen Xu
>Assignee: Apache Spark
>Priority: Major
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23990) Instruments logging improvements - ML regression package

2018-04-16 Thread Weichen Xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-23990:
---
Issue Type: Sub-task  (was: Bug)
Parent: SPARK-23686

> Instruments logging improvements - ML regression package
> 
>
> Key: SPARK-23990
> URL: https://issues.apache.org/jira/browse/SPARK-23990
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 2.3.0
> Environment: Instruments logging improvements - ML regression package
>Reporter: Weichen Xu
>Priority: Major
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23990) Instruments logging improvements - ML regression package

2018-04-16 Thread Weichen Xu (JIRA)
Weichen Xu created SPARK-23990:
--

 Summary: Instruments logging improvements - ML regression package
 Key: SPARK-23990
 URL: https://issues.apache.org/jira/browse/SPARK-23990
 Project: Spark
  Issue Type: Bug
  Components: ML
Affects Versions: 2.3.0
 Environment: Instruments logging improvements - ML regression package
Reporter: Weichen Xu






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten

2018-04-16 Thread liuxian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439231#comment-16439231
 ] 

liuxian edited comment on SPARK-23989 at 4/16/18 10:18 AM:
---

For {color:#33}`SortShuffleWriter`{color},  `records: 
{color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}]]` is key-value pair, but the value is 
'UnsafeRow' type.

For example ,we insert the first record  {color:#33}into 
`PartitionedPairBuffer`, we only save the  'AnyRef{color}',   but the 
{color:#33} 'AnyRef{color}'  of  next  record(only value, not key)  is same 
as the first record  , so the first record  is overwritten.


was (Author: 10110346):
For {color:#33}`SortShuffleWriter`{color},  `records: 
{color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}]]` is key-value pair, but the value is 
'UnsafeRow' type.

For example ,we insert the first record  {color:#33}into 
`PartitionedPairBuffer`, we only save the  '{color:#cc7832}AnyRef{color}',   
but the {color:#33} '{color:#cc7832}AnyRef{color}'{color}  of  next  
{color}record(only value, not key)  is same as the first record  , so the first 
record  is overwritten.
h1. overwritten

> When using `SortShuffleWriter`, the data will be overwritten
> 
>
> Key: SPARK-23989
> URL: https://issues.apache.org/jira/browse/SPARK-23989
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: liuxian
>Priority: Critical
>
> {color:#33}When using `SortShuffleWriter`, we only insert  
> '{color}{color:#cc7832}AnyRef{color}{color:#33}' into 
> '{color}PartitionedAppendOnlyMap{color:#33}' or 
> '{color}PartitionedPairBuffer{color:#33}'.{color}
> {color:#33}For this function:{color}
> {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: 
> {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832},
>  {color}{color:#4e807d}V{color}]])
> the value of 'records' is `UnsafeRow`, so  the value will be overwritten
> {color:#33} {color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten

2018-04-16 Thread liuxian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439231#comment-16439231
 ] 

liuxian commented on SPARK-23989:
-

For {color:#33}`SortShuffleWriter`{color},  `records: 
{color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}]]` is key-value pair, but the value is 
'UnsafeRow' type.

For example ,we insert the first record  {color:#33}into 
`PartitionedPairBuffer`, we only save the  '{color:#cc7832}AnyRef{color}',   
but the {color:#33} '{color:#cc7832}AnyRef{color}'{color}  of  next  
{color}record(only value, not key)  is same as the first record  , so the first 
record  is overwritten.
h1. overwritten

> When using `SortShuffleWriter`, the data will be overwritten
> 
>
> Key: SPARK-23989
> URL: https://issues.apache.org/jira/browse/SPARK-23989
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: liuxian
>Priority: Critical
>
> {color:#33}When using `SortShuffleWriter`, we only insert  
> '{color}{color:#cc7832}AnyRef{color}{color:#33}' into 
> '{color}PartitionedAppendOnlyMap{color:#33}' or 
> '{color}PartitionedPairBuffer{color:#33}'.{color}
> {color:#33}For this function:{color}
> {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: 
> {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832},
>  {color}{color:#4e807d}V{color}]])
> the value of 'records' is `UnsafeRow`, so  the value will be overwritten
> {color:#33} {color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten

2018-04-16 Thread Wenchen Fan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439205#comment-16439205
 ] 

Wenchen Fan commented on SPARK-23989:
-

Can you be more specific about what the problem is?

> When using `SortShuffleWriter`, the data will be overwritten
> 
>
> Key: SPARK-23989
> URL: https://issues.apache.org/jira/browse/SPARK-23989
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: liuxian
>Priority: Critical
>
> {color:#33}When using `SortShuffleWriter`, we only insert  
> '{color}{color:#cc7832}AnyRef{color}{color:#33}' into 
> '{color}PartitionedAppendOnlyMap{color:#33}' or 
> '{color}PartitionedPairBuffer{color:#33}'.{color}
> {color:#33}For this function:{color}
> {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: 
> {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832},
>  {color}{color:#4e807d}V{color}]])
> the value of 'records' is `UnsafeRow`, so  the value will be overwritten
> {color:#33} {color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten

2018-04-16 Thread liuxian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439148#comment-16439148
 ] 

liuxian edited comment on SPARK-23989 at 4/16/18 9:00 AM:
--

[~joshrosen] [~cloud_fan]


was (Author: 10110346):
[~joshrosen]

> When using `SortShuffleWriter`, the data will be overwritten
> 
>
> Key: SPARK-23989
> URL: https://issues.apache.org/jira/browse/SPARK-23989
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: liuxian
>Priority: Critical
>
> {color:#33}When using `SortShuffleWriter`, we only insert  
> '{color}{color:#cc7832}AnyRef{color}{color:#33}' into 
> '{color}PartitionedAppendOnlyMap{color:#33}' or 
> '{color}PartitionedPairBuffer{color:#33}'.{color}
> {color:#33}For this function:{color}
> {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: 
> {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832},
>  {color}{color:#4e807d}V{color}]])
> the value of 'records' is `UnsafeRow`, so  the value will be overwritten
> {color:#33} {color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten

2018-04-16 Thread liuxian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439148#comment-16439148
 ] 

liuxian commented on SPARK-23989:
-

[~joshrosen]

> When using `SortShuffleWriter`, the data will be overwritten
> 
>
> Key: SPARK-23989
> URL: https://issues.apache.org/jira/browse/SPARK-23989
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: liuxian
>Priority: Critical
>
> {color:#33}When using `SortShuffleWriter`, we only insert  
> '{color}{color:#cc7832}AnyRef{color}{color:#33}' into 
> '{color}PartitionedAppendOnlyMap{color:#33}' or 
> '{color}PartitionedPairBuffer{color:#33}'.{color}
> {color:#33}For this function:{color}
> {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: 
> {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832},
>  {color}{color:#4e807d}V{color}]])
> the value of 'records' is `UnsafeRow`, so  the value will be overwritten
> {color:#33} {color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten

2018-04-16 Thread liuxian (JIRA)
liuxian created SPARK-23989:
---

 Summary: When using `SortShuffleWriter`, the data will be 
overwritten
 Key: SPARK-23989
 URL: https://issues.apache.org/jira/browse/SPARK-23989
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: liuxian


{color:#33}When using `SortShuffleWriter`, we only insert  
'{color}{color:#cc7832}AnyRef{color}{color:#33}' into '{color}

PartitionedAppendOnlyMap{color:#33}' or 
'{color}PartitionedPairBuffer{color:#33}'.{color}

{color:#33}For this function:{color}

{color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: 
{color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}]])

the value of 'records' is `UnsafeRow`, so  the value will be overwritten

{color:#33} {color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten

2018-04-16 Thread liuxian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liuxian updated SPARK-23989:

Description: 
{color:#33}When using `SortShuffleWriter`, we only insert  
'{color}{color:#cc7832}AnyRef{color}{color:#33}' into 
'{color}PartitionedAppendOnlyMap{color:#33}' or 
'{color}PartitionedPairBuffer{color:#33}'.{color}

{color:#33}For this function:{color}

{color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: 
{color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}]])

the value of 'records' is `UnsafeRow`, so  the value will be overwritten

{color:#33} {color}

  was:
{color:#33}When using `SortShuffleWriter`, we only insert  
'{color}{color:#cc7832}AnyRef{color}{color:#33}' into '{color}

PartitionedAppendOnlyMap{color:#33}' or 
'{color}PartitionedPairBuffer{color:#33}'.{color}

{color:#33}For this function:{color}

{color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: 
{color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}]])

the value of 'records' is `UnsafeRow`, so  the value will be overwritten

{color:#33} {color}


> When using `SortShuffleWriter`, the data will be overwritten
> 
>
> Key: SPARK-23989
> URL: https://issues.apache.org/jira/browse/SPARK-23989
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: liuxian
>Priority: Critical
>
> {color:#33}When using `SortShuffleWriter`, we only insert  
> '{color}{color:#cc7832}AnyRef{color}{color:#33}' into 
> '{color}PartitionedAppendOnlyMap{color:#33}' or 
> '{color}PartitionedPairBuffer{color:#33}'.{color}
> {color:#33}For this function:{color}
> {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: 
> {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832},
>  {color}{color:#4e807d}V{color}]])
> the value of 'records' is `UnsafeRow`, so  the value will be overwritten
> {color:#33} {color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21033) fix the potential OOM in UnsafeExternalSorter

2018-04-16 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439118#comment-16439118
 ] 

Apache Spark commented on SPARK-21033:
--

User 'wangyum' has created a pull request for this issue:
https://github.com/apache/spark/pull/21077

> fix the potential OOM in UnsafeExternalSorter
> -
>
> Key: SPARK-21033
> URL: https://issues.apache.org/jira/browse/SPARK-21033
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
> Fix For: 2.3.0
>
>
> In `UnsafeInMemorySorter`, one record may take 32 bytes: 1 `long` for 
> pointer, 1 `long` for key-prefix, and another 2 `long`s as the temporary 
> buffer for radix sort.
> In `UnsafeExternalSorter`, we set the 
> `DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD` to be `1024 * 1024 * 1024 / 2`, 
> and hoping the max size of point array to be 8 GB. However this is wrong, 
> `1024 * 1024 * 1024 / 2 * 32` is actually 16 GB, and if we grow the point 
> array before reach this limitation, we may hit the max-page-size error.
> Users may see exception like this on large dataset:
> {code}
> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with 
> more than 17179869176 bytes
> at 
> org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:241)
> at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:121)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
> at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-04-16 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439114#comment-16439114
 ] 

Ohad Raviv commented on SPARK-23985:


I see in the Optimizer that filters are getting pushed only if they appear in 
the partitionSpec as they are.
Looks like we need to add to Expression some kind of property that indicates 
weather we can push through it.
More trivial example than Concat could bu Struct.
[~cloud_fan] - I see you have dealt with this code about a year ago, could you 
please take a look?

Ohad.

> predicate push down doesn't work with simple compound partition spec
> 
>
> Key: SPARK-23985
> URL: https://issues.apache.org/jira/browse/SPARK-23985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> while predicate push down works with this query: 
> {code:sql}
> select *, row_number() over (partition by a order by b) from t1 where a>1
> {code}
> it dowsn't work with:
> {code:sql}
> select *, row_number() over (partition by concat(a,'lit') order by b) from t1 
> where a>1
> {code}
>  
> I added a test to FilterPushdownSuite which I think recreates the problem:
> {code:scala}
>   test("Window: predicate push down -- ohad") {
> val winExpr = windowExpr(count('b),
>   windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
> val originalQuery = testRelation.select('a, 'b, 'c, 
> winExpr.as('window)).where('a > 1)
> val correctAnswer = testRelation
>   .where('a > 1).select('a, 'b, 'c)
>   .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
>   .select('a, 'b, 'c, 'window).analyze
> comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
>   }
> {code}
> will try to create a PR with a correction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org