[jira] [Created] (SPARK-27225) Implement join strategy hints

2019-03-20 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-27225:
---

 Summary: Implement join strategy hints
 Key: SPARK-27225
 URL: https://issues.apache.org/jira/browse/SPARK-27225
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 2.4.0
Reporter: Maryann Xue


Extend the existing BROADCAST join hint by implementing other join strategy 
hints corresponding to the rest of Spark's existing join strategies: 
shuffle-hash, sort-merge, cartesian-product. Broadcast-nested-loop will use 
BROADCAST hint as it does now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27214) Upgrading locality level when lots of pending tasks have been waiting more than locality.wait

2019-03-20 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-27214:
-
Description: 
Currently, Spark locality wait mechanism is not friendly for large job, when 
number of tasks is large(e.g. 1+)and with a large number of executors(e.g. 
2000), executors may be launched on some nodes  where the locality is not the 
best(not the same nodes hold HDFS blocks). There are cases when 
`TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within 
`spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` 
seconds a task is finished), so locality level would not be upgraded and lots 
of pending tasks will wait a long time. 

In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
executors may be removed by Driver due to become idle and finally slow down the 
job.

We encountered this issue in our production spark cluster, it caused lots of 
resources wasting and slowed down user's application.

Actually, we can optimize this by following formula:

Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
probabilityOfNextLocalitySchedule=0.5
{code:java}
maxStarvingTimeForTasks = numPendingTasks * medianOfTaskExecutionTime * 
localityExecutionGainFactor * probabilityOfNextLocalitySchedule

totalStarvingTime = sum(starvingTimeByTasks)

if (totalStarvingTime > maxStarvingTimeForTasks)

{  upgrading locality level... }{code}
 

 

  was:
Currently, Spark locality wait mechanism is not friendly for large job, when 
number of tasks is large(e.g. 1+)and with a large number of executors(e.g. 
2000), executors may be launched on some nodes  where the locality is not the 
best(not the same nodes hold HDFS blocks). There are cases when 
`TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within 
`spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` 
seconds a task is finished), so locality level would not be upgraded and lots 
of pending tasks will wait a long time. 

In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
executors may be removed by Driver due to become idle and finally slow down the 
job.

Actually, we can optimize this by following formula:

Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
probabilityOfNextLocalitySchedule=0.5

```

maxStarvingTasks = numPendingTasks * medianOfTaskExecutionTime * 
localityExecutionGainFactor * probabilityOfNextLocalitySchedule / 
`spark.locality.wait`

if (numStavingTasks > maxStarvingTasks)

{  upgrading locality level... }



```


> Upgrading locality level when lots of pending tasks have been waiting more 
> than locality.wait
> -
>
> Key: SPARK-27214
> URL: https://issues.apache.org/jira/browse/SPARK-27214
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, Spark locality wait mechanism is not friendly for large job, when 
> number of tasks is large(e.g. 1+)and with a large number of 
> executors(e.g. 2000), executors may be launched on some nodes  where the 
> locality is not the best(not the same nodes hold HDFS blocks). There are 
> cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks 
> within `spark.locality.wait` but coming at low rate(e.g. every 
> `spark.locality.wait` seconds a task is finished), so locality level would 
> not be upgraded and lots of pending tasks will wait a long time. 
> In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
> executors may be removed by Driver due to become idle and finally slow down 
> the job.
> We encountered this issue in our production spark cluster, it caused lots of 
> resources wasting and slowed down user's application.
> Actually, we can optimize this by following formula:
> Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
> probabilityOfNextLocalitySchedule=0.5
> {code:java}
> maxStarvingTimeForTasks = numPendingTasks * medianOfTaskExecutionTime * 
> localityExecutionGainFactor * probabilityOfNextLocalitySchedule
> totalStarvingTime = sum(starvingTimeByTasks)
> if (totalStarvingTime > maxStarvingTimeForTasks)
> {  upgrading locality level... }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26894) Fix Alias handling in AggregateEstimation

2019-03-20 Thread Takeshi Yamamuro (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Takeshi Yamamuro resolved SPARK-26894.
--
   Resolution: Fixed
Fix Version/s: 3.0.0

Resolved by https://github.com/apache/spark/pull/23803

> Fix Alias handling in AggregateEstimation
> -
>
> Key: SPARK-26894
> URL: https://issues.apache.org/jira/browse/SPARK-26894
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
> Fix For: 3.0.0
>
>
> Aliases are not handled separately in AggregateEstimation similar to 
> ProjectEstimation due to which stats are not getting propagated when CBO is 
> enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27223) Remove private methods that skip conversion when passing user schemas for constructing a DataFrame

2019-03-20 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27223.
--
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 24162
[https://github.com/apache/spark/pull/24162]

> Remove private methods that skip conversion when passing user schemas for 
> constructing a DataFrame
> --
>
> Key: SPARK-27223
> URL: https://issues.apache.org/jira/browse/SPARK-27223
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Maryann Xue
>Assignee: Maryann Xue
>Priority: Minor
> Fix For: 3.0.0
>
>
> When passing in a user schema to create a DataFrame, there might be 
> mismatched nullability between the user schema and the the actual data. All 
> related public interfaces now perform catalyst conversion using the user 
> provided schema, which catches such mismatches to avoid runtime errors later 
> on. However, there're private methods which allow this conversion to be 
> skipped, so we need to remove these private methods which may lead to 
> confusion and potential issues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27221) Improve the assert error message in TreeNode.parseToJson

2019-03-20 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27221.
--
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 24159
[https://github.com/apache/spark/pull/24159]

> Improve the assert error message in TreeNode.parseToJson
> 
>
> Key: SPARK-27221
> URL: https://issues.apache.org/jira/browse/SPARK-27221
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Minor
> Fix For: 3.0.0
>
>
> When TreeNode.parseToJson may throw an assert error without any error message 
> when a TreeNode is not implemented properly, and it's hard to find the bad 
> TreeNode implementation.
> It's better to improve the error message to indicate the type of TreeNode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27223) Remove private methods that skip conversion when passing user schemas for constructing a DataFrame

2019-03-20 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-27223:


Assignee: Maryann Xue

> Remove private methods that skip conversion when passing user schemas for 
> constructing a DataFrame
> --
>
> Key: SPARK-27223
> URL: https://issues.apache.org/jira/browse/SPARK-27223
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Maryann Xue
>Assignee: Maryann Xue
>Priority: Minor
>
> When passing in a user schema to create a DataFrame, there might be 
> mismatched nullability between the user schema and the the actual data. All 
> related public interfaces now perform catalyst conversion using the user 
> provided schema, which catches such mismatches to avoid runtime errors later 
> on. However, there're private methods which allow this conversion to be 
> skipped, so we need to remove these private methods which may lead to 
> confusion and potential issues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24105) Spark 2.3.0 on kubernetes

2019-03-20 Thread Kevin Hogeland (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797719#comment-16797719
 ] 

Kevin Hogeland edited comment on SPARK-24105 at 3/21/19 1:07 AM:
-

[~vanzin] Why was this marked "Won't Fix"? This is a major issue.
 * There is a limited amount of resources (constrained either by a 
ResourceQuota or by the size of the cluster)
 * Drivers are scheduled before executors due to the 2-layer scheduling design
 * Drivers consume from the same pool of resources as executors, making it 
possible to consume all available resources
 * If no driver can schedule an executor, all drivers are stalled indefinitely 
(even if they timeout and crash)

Starting too many drivers at the same time _will_ cause a deadlock. Any spiky 
workload is very likely to trigger this eventually. For example, if a large 
amount of Spark jobs are scheduled daily/hourly. We've been able to reproduce 
this easily in testing.


was (Author: hogeland):
[~vanzin] Why was this marked "Won't Fix"? This is a _major_ issue.
 * There is a limited amount of resources (constrained either by a 
ResourceQuota or by the size of the cluster)
 * Drivers are scheduled before executors due to the 2-layer scheduling design
 * Drivers consume from the same pool of resources as executors, making it 
possible to consume all available resources
 * If no driver can schedule an executor, all drivers are stalled indefinitely 
(even if they timeout and crash)

Starting too many drivers at the same time _will_ cause a deadlock. Any spiky 
workload is very likely to trigger this eventually. For example, if a large 
amount of Spark jobs are scheduled daily/hourly. We've been able to reproduce 
this easily in testing.

> Spark 2.3.0 on kubernetes
> -
>
> Key: SPARK-24105
> URL: https://issues.apache.org/jira/browse/SPARK-24105
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Lenin
>Priority: Major
>
> Right now its only possible to define node selector configurations 
> thruspark.kubernetes.node.selector.[labelKey]. This gets used for both driver 
> & executor pods. Without the capability to isolate driver & executor pods, 
> the cluster can run into a livelock scenario, where if there are a lot of 
> spark submits, can cause the driver pods to fill up the cluster capacity, 
> with no room for executor pods to do any work.
>  
> To avoid this deadlock, its required to support node selector (in future 
> affinity/anti-affinity) configruation by driver & executor.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24105) Spark 2.3.0 on kubernetes

2019-03-20 Thread Kevin Hogeland (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797719#comment-16797719
 ] 

Kevin Hogeland edited comment on SPARK-24105 at 3/21/19 1:09 AM:
-

[~vanzin] Why was this marked "Won't Fix"? This is a major issue.
 * There are a limited amount of resources (constrained either by a 
ResourceQuota or by the size of the cluster)
 * Drivers are scheduled before executors due to the 2-layer scheduling design
 * Drivers consume from the same pool of resources as executors
 * Starting too many drivers at once will make it impossible for any driver to 
schedule an executor
 * If no driver can schedule an executor, all drivers are stalled indefinitely 
(even if they timeout and crash)

Starting too many drivers at the same time _will_ cause a deadlock. Any spiky 
workload is very likely to trigger this eventually. For example, if a large 
amount of Spark jobs are scheduled daily/hourly. We've been able to reproduce 
this easily in testing.


was (Author: hogeland):
[~vanzin] Why was this marked "Won't Fix"? This is a major issue.
 * There are a limited amount of resources (constrained either by a 
ResourceQuota or by the size of the cluster)
 * Drivers are scheduled before executors due to the 2-layer scheduling design
 * Drivers consume from the same pool of resources as executors, making it 
possible to consume all available resources
 * If no driver can schedule an executor, all drivers are stalled indefinitely 
(even if they timeout and crash)

Starting too many drivers at the same time _will_ cause a deadlock. Any spiky 
workload is very likely to trigger this eventually. For example, if a large 
amount of Spark jobs are scheduled daily/hourly. We've been able to reproduce 
this easily in testing.

> Spark 2.3.0 on kubernetes
> -
>
> Key: SPARK-24105
> URL: https://issues.apache.org/jira/browse/SPARK-24105
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Lenin
>Priority: Major
>
> Right now its only possible to define node selector configurations 
> thruspark.kubernetes.node.selector.[labelKey]. This gets used for both driver 
> & executor pods. Without the capability to isolate driver & executor pods, 
> the cluster can run into a livelock scenario, where if there are a lot of 
> spark submits, can cause the driver pods to fill up the cluster capacity, 
> with no room for executor pods to do any work.
>  
> To avoid this deadlock, its required to support node selector (in future 
> affinity/anti-affinity) configruation by driver & executor.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24105) Spark 2.3.0 on kubernetes

2019-03-20 Thread Kevin Hogeland (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797719#comment-16797719
 ] 

Kevin Hogeland edited comment on SPARK-24105 at 3/21/19 1:08 AM:
-

[~vanzin] Why was this marked "Won't Fix"? This is a major issue.
 * There are a limited amount of resources (constrained either by a 
ResourceQuota or by the size of the cluster)
 * Drivers are scheduled before executors due to the 2-layer scheduling design
 * Drivers consume from the same pool of resources as executors, making it 
possible to consume all available resources
 * If no driver can schedule an executor, all drivers are stalled indefinitely 
(even if they timeout and crash)

Starting too many drivers at the same time _will_ cause a deadlock. Any spiky 
workload is very likely to trigger this eventually. For example, if a large 
amount of Spark jobs are scheduled daily/hourly. We've been able to reproduce 
this easily in testing.


was (Author: hogeland):
[~vanzin] Why was this marked "Won't Fix"? This is a major issue.
 * There is a limited amount of resources (constrained either by a 
ResourceQuota or by the size of the cluster)
 * Drivers are scheduled before executors due to the 2-layer scheduling design
 * Drivers consume from the same pool of resources as executors, making it 
possible to consume all available resources
 * If no driver can schedule an executor, all drivers are stalled indefinitely 
(even if they timeout and crash)

Starting too many drivers at the same time _will_ cause a deadlock. Any spiky 
workload is very likely to trigger this eventually. For example, if a large 
amount of Spark jobs are scheduled daily/hourly. We've been able to reproduce 
this easily in testing.

> Spark 2.3.0 on kubernetes
> -
>
> Key: SPARK-24105
> URL: https://issues.apache.org/jira/browse/SPARK-24105
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Lenin
>Priority: Major
>
> Right now its only possible to define node selector configurations 
> thruspark.kubernetes.node.selector.[labelKey]. This gets used for both driver 
> & executor pods. Without the capability to isolate driver & executor pods, 
> the cluster can run into a livelock scenario, where if there are a lot of 
> spark submits, can cause the driver pods to fill up the cluster capacity, 
> with no room for executor pods to do any work.
>  
> To avoid this deadlock, its required to support node selector (in future 
> affinity/anti-affinity) configruation by driver & executor.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24105) Spark 2.3.0 on kubernetes

2019-03-20 Thread Kevin Hogeland (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797719#comment-16797719
 ] 

Kevin Hogeland commented on SPARK-24105:


[~vanzin] Why was this marked "Won't Fix"? This is a _major_ issue.
 * There is a limited amount of resources (constrained either by a 
ResourceQuota or by the size of the cluster)
 * Drivers are scheduled before executors due to the 2-layer scheduling design
 * Drivers consume from the same pool of resources as executors, making it 
possible to consume all available resources
 * If no driver can schedule an executor, all drivers are stalled indefinitely 
(even if they timeout and crash)

Starting too many drivers at the same time _will_ cause a deadlock. Any spiky 
workload is very likely to trigger this eventually. For example, if a large 
amount of Spark jobs are scheduled daily/hourly. We've been able to reproduce 
this easily in testing.

> Spark 2.3.0 on kubernetes
> -
>
> Key: SPARK-24105
> URL: https://issues.apache.org/jira/browse/SPARK-24105
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Lenin
>Priority: Major
>
> Right now its only possible to define node selector configurations 
> thruspark.kubernetes.node.selector.[labelKey]. This gets used for both driver 
> & executor pods. Without the capability to isolate driver & executor pods, 
> the cluster can run into a livelock scenario, where if there are a lot of 
> spark submits, can cause the driver pods to fill up the cluster capacity, 
> with no room for executor pods to do any work.
>  
> To avoid this deadlock, its required to support node selector (in future 
> affinity/anti-affinity) configruation by driver & executor.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27224) Spark to_json parses UTC timestamp incorrectly

2019-03-20 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797711#comment-16797711
 ] 

Hyukjin Kwon commented on SPARK-27224:
--

This was an unexpected behaviour from SimpleDateFormat if I am not mistaken. 
It's fixed in the current master by explicitly setting {{timestampFormat}}.

{code}
 Seq((s"""{"t":"${t}"}""")).toDF("json").select(from_json(col("json"), schema, 
Map("timestampFormat" -> "-MM-dd'T'HH:mm:ss.SSSXXX"))).show(false)
{code}

I don;t think we should backport this lower Spark versions since the changes 
are quite big and breaking.

> Spark to_json parses UTC timestamp incorrectly
> --
>
> Key: SPARK-27224
> URL: https://issues.apache.org/jira/browse/SPARK-27224
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Jeff Xu
>Priority: Major
>
> When parsing ISO-8601 timestamp, if there is UTC suffix symbol, and more than 
> 3 digits in the fraction part, from_json will give incorrect result.
>  
> {noformat}
> scala> val schema = new StructType().add("t", TimestampType)
> #
> # no "Z", no problem
> #
> scala> val t = "2019-03-20T09:01:03.1234567"
> scala> Seq((s"""{"t":"${t}"}""")).toDF("json").select(from_json(col("json"), 
> schema)).show(false)
> +-+
> |jsontostructs(json)      |
> +-+
> |[2019-03-20 09:01:03.123]|
> +-+
> #
> # Add "Z", incorrect
> #
> scala> val t = "2019-03-20T09:01:03.1234567Z"
> scala> Seq((s"""{"t":"${t}"}""")).toDF("json").select(from_json(col("json"), 
> schema)).show(false)
> +-+
> |jsontostructs(json)      |
> +-+
> |[2019-03-20 02:21:37.567]|
> +-+
> #
> # reduce the # of digits, the conversion is incorrect until only we reach 3 
> digits
> #
> scala> val t = "2019-03-20T09:01:03.123456Z"
> +-+
> |jsontostructs(json)      |
> +-+
> |[2019-03-20 02:03:06.456]|
> +-+
> scala> val t = "2019-03-20T09:01:03.12345Z
> +-+
> |jsontostructs(json)      |
> +-+
> |[2019-03-20 02:01:15.345]|
> +-+
> scala> val t = "2019-03-20T09:01:03.1234Z"
> +-+
> |jsontostructs(json)      |
> +-+
> |[2019-03-20 02:01:04.234]|
> +-+
> # correct when there is <=3 digits in fraction
> scala> val t = "2019-03-20T09:01:03.123Z"
> +-+
> |jsontostructs(json)      |
> +-+
> |[2019-03-20 02:01:03.123]|
> +-+
> scala> val t = "2019-03-20T09:01:03.999Z"
> +-+
> |jsontostructs(json)      |
> +-+
> |[2019-03-20 02:01:03.999]|
> +-+
> {noformat}
>  
> This could be related to SPARK-17914.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27177) Update jenkins locale to en_US.UTF-8

2019-03-20 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797709#comment-16797709
 ] 

Hyukjin Kwon commented on SPARK-27177:
--

Yes, I think we can unblock https://github.com/apache/spark/pull/23823 as well 

> Update jenkins locale to en_US.UTF-8
> 
>
> Key: SPARK-27177
> URL: https://issues.apache.org/jira/browse/SPARK-27177
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build, jenkins
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> Two test cases will failed on our jenkins since HADOOP-12045(Hadoop-2.8.0). 
> I'd like to update our jenkins locale to en_US.UTF-8 to workaround this issue.
>  How to reproduce:
> {code:java}
> export LANG=
> git clone https://github.com/apache/spark.git && cd spark && git checkout 
> v2.4.0
> build/sbt "hive/testOnly *.HiveDDLSuite" -Phive -Phadoop-2.7 
> -Dhadoop.version=2.8.0
> {code}
> Stack trace:
> {noformat}
> Caused by: sbt.ForkMain$ForkError: java.nio.file.InvalidPathException: 
> Malformed input or input contains unmappable characters: 
> /home/jenkins/workspace/SparkPullRequestBuilder@2/target/tmp/warehouse-15474fdf-0808-40ab-946d-1309fb05bf26/DaTaBaSe_I.db/tab_ı
>   at sun.nio.fs.UnixPath.encode(UnixPath.java:147)
>   at sun.nio.fs.UnixPath.(UnixPath.java:71)
>   at sun.nio.fs.UnixFileSystem.getPath(UnixFileSystem.java:281)
>   at java.io.File.toPath(File.java:2234)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getLastAccessTime(RawLocalFileSystem.java:683)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.(RawLocalFileSystem.java:694)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:664)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:987)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:656)
>   at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:454)
>   at org.apache.hadoop.hive.metastore.Warehouse.isDir(Warehouse.java:520)
>   at 
> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1436)
>   at 
> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1503)
> {noformat}
> Workaround:
> {code:java}
> export LANG=en_US.UTF-8
> build/sbt "hive/testOnly *.HiveDDLSuite" -Phive -Phadoop-2.7 
> -Dhadoop.version=2.8.0
> {code}
> More details: 
> https://issues.apache.org/jira/browse/HADOOP-16180
> https://github.com/apache/spark/pull/24044/commits/4c1ec25d3bc64bf358edf1380a7c863596722362



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27224) Spark to_json parses UTC timestamp incorrectly

2019-03-20 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27224.
--
Resolution: Cannot Reproduce

> Spark to_json parses UTC timestamp incorrectly
> --
>
> Key: SPARK-27224
> URL: https://issues.apache.org/jira/browse/SPARK-27224
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Jeff Xu
>Priority: Major
>
> When parsing ISO-8601 timestamp, if there is UTC suffix symbol, and more than 
> 3 digits in the fraction part, from_json will give incorrect result.
>  
> {noformat}
> scala> val schema = new StructType().add("t", TimestampType)
> #
> # no "Z", no problem
> #
> scala> val t = "2019-03-20T09:01:03.1234567"
> scala> Seq((s"""{"t":"${t}"}""")).toDF("json").select(from_json(col("json"), 
> schema)).show(false)
> +-+
> |jsontostructs(json)      |
> +-+
> |[2019-03-20 09:01:03.123]|
> +-+
> #
> # Add "Z", incorrect
> #
> scala> val t = "2019-03-20T09:01:03.1234567Z"
> scala> Seq((s"""{"t":"${t}"}""")).toDF("json").select(from_json(col("json"), 
> schema)).show(false)
> +-+
> |jsontostructs(json)      |
> +-+
> |[2019-03-20 02:21:37.567]|
> +-+
> #
> # reduce the # of digits, the conversion is incorrect until only we reach 3 
> digits
> #
> scala> val t = "2019-03-20T09:01:03.123456Z"
> +-+
> |jsontostructs(json)      |
> +-+
> |[2019-03-20 02:03:06.456]|
> +-+
> scala> val t = "2019-03-20T09:01:03.12345Z
> +-+
> |jsontostructs(json)      |
> +-+
> |[2019-03-20 02:01:15.345]|
> +-+
> scala> val t = "2019-03-20T09:01:03.1234Z"
> +-+
> |jsontostructs(json)      |
> +-+
> |[2019-03-20 02:01:04.234]|
> +-+
> # correct when there is <=3 digits in fraction
> scala> val t = "2019-03-20T09:01:03.123Z"
> +-+
> |jsontostructs(json)      |
> +-+
> |[2019-03-20 02:01:03.123]|
> +-+
> scala> val t = "2019-03-20T09:01:03.999Z"
> +-+
> |jsontostructs(json)      |
> +-+
> |[2019-03-20 02:01:03.999]|
> +-+
> {noformat}
>  
> This could be related to SPARK-17914.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27006) SPIP: .NET bindings for Apache Spark

2019-03-20 Thread Markus Weimer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797688#comment-16797688
 ] 

Markus Weimer commented on SPARK-27006:
---

+1 on a .NET API for Spark. Over in 
[ML.NET|[https://github.com/dotnet/machinelearning]] we have a bunch of users 
who would really benefit from this.

> SPIP: .NET bindings for Apache Spark
> 
>
> Key: SPARK-27006
> URL: https://issues.apache.org/jira/browse/SPARK-27006
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Terry Kim
>Priority: Major
>   Original Estimate: 4,032h
>  Remaining Estimate: 4,032h
>
> h4. Background and Motivation: 
> Apache Spark provides programming language support for Scala/Java (native), 
> and extensions for Python and R. While a variety of other language extensions 
> are possible to include in Apache Spark, .NET would bring one of the largest 
> developer community to the table. Presently, no good Big Data solution exists 
> for .NET developers in open source.  This SPIP aims at discussing how we can 
> bring Apache Spark goodness to the .NET development platform.  
> .NET is a free, cross-platform, open source developer platform for building 
> many different types of applications. With .NET, you can use multiple 
> languages, editors, and libraries to build for web, mobile, desktop, gaming, 
> and IoT types of applications. Even with .NET serving millions of developers, 
> there is no good Big Data solution that exists today, which this SPIP aims to 
> address.  
> The .NET developer community is one of the largest programming language 
> communities in the world. Its flagship programming language C# is listed as 
> one of the most popular programming languages in a variety of articles and 
> statistics: 
>  * Most popular Technologies on Stack Overflow: 
> [https://insights.stackoverflow.com/survey/2018/#most-popular-technologies|https://insights.stackoverflow.com/survey/2018/]
>   
>  * Most popular languages on GitHub 2018: 
> [https://www.businessinsider.com/the-10-most-popular-programming-languages-according-to-github-2018-10#2-java-9|https://www.businessinsider.com/the-10-most-popular-programming-languages-according-to-github-2018-10]
>  
>  * 1M+ new developers last 1 year  
>  * Second most demanded technology on LinkedIn 
>  * Top 30 High velocity OSS projects on GitHub 
> Including a C# language extension in Apache Spark will enable millions of 
> .NET developers to author Big Data applications in their preferred 
> programming language, developer environment, and tooling support. We aim to 
> promote the .NET bindings for Spark through engagements with the Spark 
> community (e.g., we are scheduled to present an early prototype at the SF 
> Spark Summit 2019) and the .NET developer community (e.g., similar 
> presentations will be held at .NET developer conferences this year).  As 
> such, we believe that our efforts will help grow the Spark community by 
> making it accessible to the millions of .NET developers. 
> Furthermore, our early discussions with some large .NET development teams got 
> an enthusiastic reception. 
> We recognize that earlier attempts at this goal (specifically Mobius 
> [https://github.com/Microsoft/Mobius]) were unsuccessful primarily due to the 
> lack of communication with the Spark community. Therefore, another goal of 
> this proposal is to not only develop .NET bindings for Spark in open source, 
> but also continuously seek feedback from the Spark community via posted 
> Jira’s (like this one) and the Spark developer mailing list. Our hope is that 
> through these engagements, we can build a community of developers that are 
> eager to contribute to this effort or want to leverage the resulting .NET 
> bindings for Spark in their respective Big Data applications. 
> h4. Target Personas: 
> .NET developers looking to build big data solutions.  
> h4. Goals: 
> Our primary goal is to help grow Apache Spark by making it accessible to the 
> large .NET developer base and ecosystem. We will also look for opportunities 
> to generalize the interop layers for Spark for adding other language 
> extensions in the future. [SPARK-26257]( 
> https://issues.apache.org/jira/browse/SPARK-26257) proposes such a 
> generalized interop layer, which we hope to address over the course of this 
> project.  
> Another important goal for us is to not only enable Spark as an application 
> solution for .NET developers, but also opening the door for .NET developers 
> to make contributions to Apache Spark itself.   
> Lastly, we aim to develop a .NET extension in the open, while continually 
> engaging with the Spark community for feedback on designs and code. We will 
> welcome PRs from the Spark community throughout this project and aim 

[jira] [Created] (SPARK-27224) Spark to_json parses UTC timestamp incorrectly

2019-03-20 Thread Jeff Xu (JIRA)
Jeff Xu created SPARK-27224:
---

 Summary: Spark to_json parses UTC timestamp incorrectly
 Key: SPARK-27224
 URL: https://issues.apache.org/jira/browse/SPARK-27224
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: Jeff Xu


When parsing ISO-8601 timestamp, if there is UTC suffix symbol, and more than 3 
digits in the fraction part, from_json will give incorrect result.

 
{noformat}
scala> val schema = new StructType().add("t", TimestampType)

#
# no "Z", no problem
#

scala> val t = "2019-03-20T09:01:03.1234567"
scala> Seq((s"""{"t":"${t}"}""")).toDF("json").select(from_json(col("json"), 
schema)).show(false)
+-+
|jsontostructs(json)      |
+-+
|[2019-03-20 09:01:03.123]|
+-+

#
# Add "Z", incorrect
#

scala> val t = "2019-03-20T09:01:03.1234567Z"
scala> Seq((s"""{"t":"${t}"}""")).toDF("json").select(from_json(col("json"), 
schema)).show(false)
+-+
|jsontostructs(json)      |
+-+
|[2019-03-20 02:21:37.567]|
+-+

#
# reduce the # of digits, the conversion is incorrect until only we reach 3 
digits
#

scala> val t = "2019-03-20T09:01:03.123456Z"

+-+
|jsontostructs(json)      |
+-+
|[2019-03-20 02:03:06.456]|
+-+

scala> val t = "2019-03-20T09:01:03.12345Z

+-+
|jsontostructs(json)      |
+-+
|[2019-03-20 02:01:15.345]|
+-+


scala> val t = "2019-03-20T09:01:03.1234Z"

+-+
|jsontostructs(json)      |
+-+
|[2019-03-20 02:01:04.234]|
+-+


# correct when there is <=3 digits in fraction


scala> val t = "2019-03-20T09:01:03.123Z"

+-+
|jsontostructs(json)      |
+-+
|[2019-03-20 02:01:03.123]|
+-+


scala> val t = "2019-03-20T09:01:03.999Z"

+-+
|jsontostructs(json)      |
+-+
|[2019-03-20 02:01:03.999]|
+-+

{noformat}
 

This could be related to SPARK-17914.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27194) Job failures when task attempts do not clean up spark-staging parquet files

2019-03-20 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-27194:
--
Affects Version/s: 2.3.3

> Job failures when task attempts do not clean up spark-staging parquet files
> ---
>
> Key: SPARK-27194
> URL: https://issues.apache.org/jira/browse/SPARK-27194
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.3.1, 2.3.2, 2.3.3
>Reporter: Reza Safi
>Priority: Major
>
> When a container fails for some reason (for example when killed by yarn for 
> exceeding memory limits), the subsequent task attempts for the tasks that 
> were running on that container all fail with a FileAlreadyExistsException. 
> The original task attempt does not seem to successfully call abortTask (or at 
> least its "best effort" delete is unsuccessful) and clean up the parquet file 
> it was writing to, so when later task attempts try to write to the same 
> spark-staging directory using the same file name, the job fails.
> Here is what transpires in the logs:
> The container where task 200.0 is running is killed and the task is lost:
> {code}
> 19/02/20 09:33:25 ERROR cluster.YarnClusterScheduler: Lost executor y on 
> t.y.z.com: Container killed by YARN for exceeding memory limits. 8.1 GB of 8 
> GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
>  19/02/20 09:33:25 WARN scheduler.TaskSetManager: Lost task 200.0 in stage 
> 0.0 (TID xxx, t.y.z.com, executor 93): ExecutorLostFailure (executor 93 
> exited caused by one of the running tasks) Reason: Container killed by YARN 
> for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider 
> boosting spark.yarn.executor.memoryOverhead.
> {code}
> The task is re-attempted on a different executor and fails because the 
> part-00200-blah-blah.c000.snappy.parquet file from the first task attempt 
> already exists:
> {code}
> 19/02/20 09:35:01 WARN scheduler.TaskSetManager: Lost task 200.1 in stage 0.0 
> (TID 594, tn.y.z.com, executor 70): org.apache.spark.SparkException: Task 
> failed while writing rows.
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:109)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>  Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet
>  for client a.b.c.d already exists
> {code}
> The job fails when the the configured task attempts (spark.task.maxFailures) 
> have failed with the same error:
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 200 
> in stage 0.0 failed 20 times, most recent failure: Lost task 284.19 in stage 
> 0.0 (TID yyy, tm.y.z.com, executor 16): org.apache.spark.SparkException: Task 
> failed while writing rows.
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
>  ...
>  Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet
>  for client i.p.a.d already exists
> {code}
> SPARK-26682 wasn't the root cause here, since there wasn't any stage 
> reattempt.
> This issue seems to happen when 
> spark.sql.sources.partitionOverwriteMode=dynamic. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27194) Job failures when task attempts do not clean up spark-staging parquet files

2019-03-20 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797633#comment-16797633
 ] 

Dongjoon Hyun commented on SPARK-27194:
---

Thank you for checking that, [~ajithshetty].

> Job failures when task attempts do not clean up spark-staging parquet files
> ---
>
> Key: SPARK-27194
> URL: https://issues.apache.org/jira/browse/SPARK-27194
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.3.1, 2.3.2, 2.3.3
>Reporter: Reza Safi
>Priority: Major
>
> When a container fails for some reason (for example when killed by yarn for 
> exceeding memory limits), the subsequent task attempts for the tasks that 
> were running on that container all fail with a FileAlreadyExistsException. 
> The original task attempt does not seem to successfully call abortTask (or at 
> least its "best effort" delete is unsuccessful) and clean up the parquet file 
> it was writing to, so when later task attempts try to write to the same 
> spark-staging directory using the same file name, the job fails.
> Here is what transpires in the logs:
> The container where task 200.0 is running is killed and the task is lost:
> {code}
> 19/02/20 09:33:25 ERROR cluster.YarnClusterScheduler: Lost executor y on 
> t.y.z.com: Container killed by YARN for exceeding memory limits. 8.1 GB of 8 
> GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
>  19/02/20 09:33:25 WARN scheduler.TaskSetManager: Lost task 200.0 in stage 
> 0.0 (TID xxx, t.y.z.com, executor 93): ExecutorLostFailure (executor 93 
> exited caused by one of the running tasks) Reason: Container killed by YARN 
> for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider 
> boosting spark.yarn.executor.memoryOverhead.
> {code}
> The task is re-attempted on a different executor and fails because the 
> part-00200-blah-blah.c000.snappy.parquet file from the first task attempt 
> already exists:
> {code}
> 19/02/20 09:35:01 WARN scheduler.TaskSetManager: Lost task 200.1 in stage 0.0 
> (TID 594, tn.y.z.com, executor 70): org.apache.spark.SparkException: Task 
> failed while writing rows.
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:109)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>  Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet
>  for client a.b.c.d already exists
> {code}
> The job fails when the the configured task attempts (spark.task.maxFailures) 
> have failed with the same error:
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 200 
> in stage 0.0 failed 20 times, most recent failure: Lost task 284.19 in stage 
> 0.0 (TID yyy, tm.y.z.com, executor 16): org.apache.spark.SparkException: Task 
> failed while writing rows.
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
>  ...
>  Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet
>  for client i.p.a.d already exists
> {code}
> SPARK-26682 wasn't the root cause here, since there wasn't any stage 
> reattempt.
> This issue seems to happen when 
> spark.sql.sources.partitionOverwriteMode=dynamic. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27205) spark-shell with packages option fails to load transitive dependencies even ivy successfully pulls jars

2019-03-20 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen reassigned SPARK-27205:
-

Assignee: Jungtaek Lim

> spark-shell with packages option fails to load transitive dependencies even 
> ivy successfully pulls jars
> ---
>
> Key: SPARK-27205
> URL: https://issues.apache.org/jira/browse/SPARK-27205
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
>
> I found this bug while testing my patch regarding Spark SQL Kafka module - I 
> tend to open spark-shell and link kafka module via `–packages`.
> When we run
> {code:java}
> ./bin/spark-shell --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0{code}
> we should be able to import "org.apache.kafka" in spark-shell, but it doesn't 
> work for current master branch.
> There's not enough evidence as well as I have no idea what's happening here 
> even with `–verbose` option, so I had to spend couple of hours dealing with 
> git bisect.
> Turned out the commit introducing the bug was SPARK-26977 
> ([81dd21fda99da48ed76adb739a07d1dabf1ffb51|https://github.com/apache/spark/commit/81dd21fda99da48ed76adb739a07d1dabf1ffb51]).
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27205) spark-shell with packages option fails to load transitive dependencies even ivy successfully pulls jars

2019-03-20 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-27205.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 24147
[https://github.com/apache/spark/pull/24147]

> spark-shell with packages option fails to load transitive dependencies even 
> ivy successfully pulls jars
> ---
>
> Key: SPARK-27205
> URL: https://issues.apache.org/jira/browse/SPARK-27205
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
> Fix For: 3.0.0
>
>
> I found this bug while testing my patch regarding Spark SQL Kafka module - I 
> tend to open spark-shell and link kafka module via `–packages`.
> When we run
> {code:java}
> ./bin/spark-shell --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0{code}
> we should be able to import "org.apache.kafka" in spark-shell, but it doesn't 
> work for current master branch.
> There's not enough evidence as well as I have no idea what's happening here 
> even with `–verbose` option, so I had to spend couple of hours dealing with 
> git bisect.
> Turned out the commit introducing the bug was SPARK-26977 
> ([81dd21fda99da48ed76adb739a07d1dabf1ffb51|https://github.com/apache/spark/commit/81dd21fda99da48ed76adb739a07d1dabf1ffb51]).
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27220) Remove Yarn specific leftover from CoarseGrainedSchedulerBackend

2019-03-20 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797630#comment-16797630
 ] 

Dongjoon Hyun commented on SPARK-27220:
---

+1 for [~srowen]'s advice~

> Remove Yarn specific leftover from CoarseGrainedSchedulerBackend
> 
>
> Key: SPARK-27220
> URL: https://issues.apache.org/jira/browse/SPARK-27220
> Project: Spark
>  Issue Type: Task
>  Components: Spark Core, YARN
>Affects Versions: 2.0.2, 2.1.3, 2.2.3, 2.3.3, 2.4.0
>Reporter: Jacek Lewandowski
>Priority: Minor
>
> {{CoarseGrainedSchedulerBackend}} has the following field:
> {code:scala}
>   // The num of current max ExecutorId used to re-register appMaster
>   @volatile protected var currentExecutorIdCounter = 0
> {code}
> which is then updated:
> {code:scala}
>   case RegisterExecutor(executorId, executorRef, hostname, cores, 
> logUrls) =>
> ...
>   // This must be synchronized because variables mutated
>   // in this block are read when requesting executors
>   CoarseGrainedSchedulerBackend.this.synchronized {
> executorDataMap.put(executorId, data)
> if (currentExecutorIdCounter < executorId.toInt) {
>   currentExecutorIdCounter = executorId.toInt
> }
> ...
> {code}
> However it is never really used in {{CoarseGrainedSchedulerBackend}}. Its 
> only usage is in Yarn-specific code. It should be moved to Yarn then because 
> {{executorId}} is a {{String}} and there are really no guarantees that it is 
> always an integer. It was introduced in SPARK-12864



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26729) Spark on Kubernetes tooling hardcodes default image names

2019-03-20 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin resolved SPARK-26729.

   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 23846
[https://github.com/apache/spark/pull/23846]

> Spark on Kubernetes tooling hardcodes default image names
> -
>
> Key: SPARK-26729
> URL: https://issues.apache.org/jira/browse/SPARK-26729
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Rob Vesse
>Assignee: Rob Vesse
>Priority: Major
> Fix For: 3.0.0
>
>
> Both when creating images with {{bin/docker-image-tool.sh}} and when running 
> the Kubernetes integration tests the image names are hardcoded to {{spark}}, 
> {{spark-py}} and {{spark-r}}.
> If you are producing custom images in some other way (e.g. a CI/CD process 
> that doesn't use the script) or are required to use a different naming 
> convention due to company policy e.g. prefixing with vendor name (e.g. 
> {{apache-spark}}) then you can't directly create/test your images with the 
> desired names.
> You can of course simply re-tag the images with the desired names but this 
> might not be possible in some CI/CD pipelines especially if naming 
> conventions are being enforced at the registry level.
> It would be nice if the default image names were customisable



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26729) Spark on Kubernetes tooling hardcodes default image names

2019-03-20 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin reassigned SPARK-26729:
--

Assignee: Rob Vesse

> Spark on Kubernetes tooling hardcodes default image names
> -
>
> Key: SPARK-26729
> URL: https://issues.apache.org/jira/browse/SPARK-26729
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Rob Vesse
>Assignee: Rob Vesse
>Priority: Major
>
> Both when creating images with {{bin/docker-image-tool.sh}} and when running 
> the Kubernetes integration tests the image names are hardcoded to {{spark}}, 
> {{spark-py}} and {{spark-r}}.
> If you are producing custom images in some other way (e.g. a CI/CD process 
> that doesn't use the script) or are required to use a different naming 
> convention due to company policy e.g. prefixing with vendor name (e.g. 
> {{apache-spark}}) then you can't directly create/test your images with the 
> desired names.
> You can of course simply re-tag the images with the desired names but this 
> might not be possible in some CI/CD pipelines especially if naming 
> conventions are being enforced at the registry level.
> It would be nice if the default image names were customisable



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27215) Correct the kryo configurations

2019-03-20 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin resolved SPARK-27215.

   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 24156
[https://github.com/apache/spark/pull/24156]

> Correct the kryo configurations
> ---
>
> Key: SPARK-27215
> URL: https://issues.apache.org/jira/browse/SPARK-27215
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Lantao Jin
>Assignee: Lantao Jin
>Priority: Major
> Fix For: 3.0.0
>
>
> {code}
>   val KRYO_USE_UNSAFE = ConfigBuilder("spark.kyro.unsafe")
> .booleanConf
> .createWithDefault(false)
>   val KRYO_USE_POOL = ConfigBuilder("spark.kyro.pool")
> .booleanConf
> .createWithDefault(true)
> {code}
> kyro should be kryo



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27215) Correct the kryo configurations

2019-03-20 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin reassigned SPARK-27215:
--

Assignee: Lantao Jin

> Correct the kryo configurations
> ---
>
> Key: SPARK-27215
> URL: https://issues.apache.org/jira/browse/SPARK-27215
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Lantao Jin
>Assignee: Lantao Jin
>Priority: Major
>
> {code}
>   val KRYO_USE_UNSAFE = ConfigBuilder("spark.kyro.unsafe")
> .booleanConf
> .createWithDefault(false)
>   val KRYO_USE_POOL = ConfigBuilder("spark.kyro.pool")
> .booleanConf
> .createWithDefault(true)
> {code}
> kyro should be kryo



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27223) Remove private methods that skip conversion when passing user schemas for constructing a DataFrame

2019-03-20 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-27223:

Summary: Remove private methods that skip conversion when passing user 
schemas for constructing a DataFrame  (was: Remove private methods that allow 
no conversion when passing user schemas for constructing a DataFrame)

> Remove private methods that skip conversion when passing user schemas for 
> constructing a DataFrame
> --
>
> Key: SPARK-27223
> URL: https://issues.apache.org/jira/browse/SPARK-27223
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Maryann Xue
>Priority: Minor
>
> When passing in a user schema to create a DataFrame, there might be 
> mismatched nullability between the user schema and the the actual data. All 
> related public interfaces now perform catalyst conversion using the user 
> provided schema, which catches such mismatches to avoid runtime errors later 
> on. However, there're private methods which allow this conversion to be 
> skipped, so we need to remove these private methods which may lead to 
> confusion and potential issues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27223) Remove private methods that allow no conversion when passing user schemas for constructing a DataFrame

2019-03-20 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-27223:
---

 Summary: Remove private methods that allow no conversion when 
passing user schemas for constructing a DataFrame
 Key: SPARK-27223
 URL: https://issues.apache.org/jira/browse/SPARK-27223
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0
Reporter: Maryann Xue


When passing in a user schema to create a DataFrame, there might be mismatched 
nullability between the user schema and the the actual data. All related public 
interfaces now perform catalyst conversion using the user provided schema, 
which catches such mismatches to avoid runtime errors later on. However, 
there're private methods which allow this conversion to be skipped, so we need 
to remove these private methods which may lead to confusion and potential 
issues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27222) Support Instant and LocalDate in Literal.apply

2019-03-20 Thread Maxim Gekk (JIRA)
Maxim Gekk created SPARK-27222:
--

 Summary: Support Instant and LocalDate in Literal.apply
 Key: SPARK-27222
 URL: https://issues.apache.org/jira/browse/SPARK-27222
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Maxim Gekk


SPARK-26902 and SPARK-27008 supported java.time.Instant and java.time.LocalDate 
as external types for TimestampType and DateType. The ticket aims to support 
literals of such types. In particular, need to extend Literal.apply by adding 
new cases for  java.time.Instant/LocalDate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27160) Incorrect Literal Casting of DecimalType in OrcFilters

2019-03-20 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-27160.
---
   Resolution: Fixed
 Assignee: Darcy Shen
Fix Version/s: 3.0.0

> Incorrect Literal Casting of DecimalType in OrcFilters
> --
>
> Key: SPARK-27160
> URL: https://issues.apache.org/jira/browse/SPARK-27160
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Darcy Shen
>Assignee: Darcy Shen
>Priority: Major
>  Labels: correctness
> Fix For: 3.0.0
>
>
> DecimalType Literal should not be casted to Long.
> eg. For `df.filter("x < 3.14")`, assuming df (x in DecimalType) reads from a 
> ORC table and uses the native ORC reader with predicate push down enabled, we 
> will push down the `x < 3.14` predicate to the ORC reader via a 
> SearchArgument.
> OrcFilters will construct the SearchArgument, but not handle the DecimalType 
> correctly.
> The previous impl will construct `x < 3` from `x < 3.14`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27220) Remove Yarn specific leftover from CoarseGrainedSchedulerBackend

2019-03-20 Thread Sean Owen (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797460#comment-16797460
 ] 

Sean Owen commented on SPARK-27220:
---

Probably [~irashid] would know better

> Remove Yarn specific leftover from CoarseGrainedSchedulerBackend
> 
>
> Key: SPARK-27220
> URL: https://issues.apache.org/jira/browse/SPARK-27220
> Project: Spark
>  Issue Type: Task
>  Components: Spark Core, YARN
>Affects Versions: 2.0.2, 2.1.3, 2.2.3, 2.3.3, 2.4.0
>Reporter: Jacek Lewandowski
>Priority: Minor
>
> {{CoarseGrainedSchedulerBackend}} has the following field:
> {code:scala}
>   // The num of current max ExecutorId used to re-register appMaster
>   @volatile protected var currentExecutorIdCounter = 0
> {code}
> which is then updated:
> {code:scala}
>   case RegisterExecutor(executorId, executorRef, hostname, cores, 
> logUrls) =>
> ...
>   // This must be synchronized because variables mutated
>   // in this block are read when requesting executors
>   CoarseGrainedSchedulerBackend.this.synchronized {
> executorDataMap.put(executorId, data)
> if (currentExecutorIdCounter < executorId.toInt) {
>   currentExecutorIdCounter = executorId.toInt
> }
> ...
> {code}
> However it is never really used in {{CoarseGrainedSchedulerBackend}}. Its 
> only usage is in Yarn-specific code. It should be moved to Yarn then because 
> {{executorId}} is a {{String}} and there are really no guarantees that it is 
> always an integer. It was introduced in SPARK-12864



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27094) Thread interrupt being swallowed while launching executors in YarnAllocator

2019-03-20 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin resolved SPARK-27094.

   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 24017
[https://github.com/apache/spark/pull/24017]

> Thread interrupt being swallowed while launching executors in YarnAllocator
> ---
>
> Key: SPARK-27094
> URL: https://issues.apache.org/jira/browse/SPARK-27094
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.4.0
>Reporter: Marcelo Vanzin
>Assignee: Marcelo Vanzin
>Priority: Minor
> Fix For: 3.0.0
>
>
> When shutting down a SparkContext, the YarnAllocator thread is interrupted. 
> If the interrupt happens just at the wrong time, you'll see something like 
> this:
> {noformat}
> 19/03/05 07:04:20 WARN ScriptBasedMapping: Exception running blah
> java.io.IOException: java.lang.InterruptedException
>   at org.apache.hadoop.util.Shell.runCommand(Shell.java:578)
>   at org.apache.hadoop.util.Shell.run(Shell.java:478)
>   at 
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:766)
>   at 
> org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.runResolveCommand(ScriptBasedMapping.java:251)
>   at 
> org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.resolve(ScriptBasedMapping.java:188)
>   at 
> org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119)
>   at 
> org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
>   at 
> org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
>   at 
> org.apache.spark.deploy.yarn.SparkRackResolver.resolve(SparkRackResolver.scala:37)
>   at 
> org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$handleAllocatedContainers$2.apply(YarnAllocator.scala:431)
>   at 
> org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$handleAllocatedContainers$2.apply(YarnAllocator.scala:430)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.deploy.yarn.YarnAllocator.handleAllocatedContainers(YarnAllocator.scala:430)
>   at 
> org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:281)
>   at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:556)
> {noformat}
> That means the YARN code being called ({{RackResolver}}) is swallowing the 
> interrupt , so the Spark allocator thread never exits. In this particular 
> app, the allocator was in the middle of allocating a very large number of 
> executors, so it seemed like the application was hung, and there were a lot 
> of executor coming up even though the context was being shut down.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27094) Thread interrupt being swallowed while launching executors in YarnAllocator

2019-03-20 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin reassigned SPARK-27094:
--

Assignee: Marcelo Vanzin

> Thread interrupt being swallowed while launching executors in YarnAllocator
> ---
>
> Key: SPARK-27094
> URL: https://issues.apache.org/jira/browse/SPARK-27094
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.4.0
>Reporter: Marcelo Vanzin
>Assignee: Marcelo Vanzin
>Priority: Minor
>
> When shutting down a SparkContext, the YarnAllocator thread is interrupted. 
> If the interrupt happens just at the wrong time, you'll see something like 
> this:
> {noformat}
> 19/03/05 07:04:20 WARN ScriptBasedMapping: Exception running blah
> java.io.IOException: java.lang.InterruptedException
>   at org.apache.hadoop.util.Shell.runCommand(Shell.java:578)
>   at org.apache.hadoop.util.Shell.run(Shell.java:478)
>   at 
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:766)
>   at 
> org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.runResolveCommand(ScriptBasedMapping.java:251)
>   at 
> org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.resolve(ScriptBasedMapping.java:188)
>   at 
> org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119)
>   at 
> org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
>   at 
> org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
>   at 
> org.apache.spark.deploy.yarn.SparkRackResolver.resolve(SparkRackResolver.scala:37)
>   at 
> org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$handleAllocatedContainers$2.apply(YarnAllocator.scala:431)
>   at 
> org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$handleAllocatedContainers$2.apply(YarnAllocator.scala:430)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.deploy.yarn.YarnAllocator.handleAllocatedContainers(YarnAllocator.scala:430)
>   at 
> org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:281)
>   at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:556)
> {noformat}
> That means the YARN code being called ({{RackResolver}}) is swallowing the 
> interrupt , so the Spark allocator thread never exits. In this particular 
> app, the allocator was in the middle of allocating a very large number of 
> executors, so it seemed like the application was hung, and there were a lot 
> of executor coming up even though the context was being shut down.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27219) Misleading exceptions in transport code's SASL fallback path

2019-03-20 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797435#comment-16797435
 ] 

Ajith S commented on SPARK-27219:
-

So do we just log a simple warn with one line message and print the stack in a 
finer(DEBUG, TRACE) log level.? 

> Misleading exceptions in transport code's SASL fallback path
> 
>
> Key: SPARK-27219
> URL: https://issues.apache.org/jira/browse/SPARK-27219
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Marcelo Vanzin
>Priority: Minor
>
> There are a couple of code paths in the SASL fallback handling that result in 
> misleading exceptions printed to logs. One of them is if a timeout occurs 
> during authentication; for example:
> {noformat}
> 19/03/15 11:21:37 WARN crypto.AuthClientBootstrap: New auth protocol failed, 
> trying SASL.
> java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
> at 
> org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
> at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:258)
> at 
> org.apache.spark.network.crypto.AuthClientBootstrap.doSparkAuth(AuthClientBootstrap.java:105)
> at 
> org.apache.spark.network.crypto.AuthClientBootstrap.doBootstrap(AuthClientBootstrap.java:79)
> at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:262)
> at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:192)
> at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.lambda$fetchBlocks$0(ExternalShuffleClient.java:100)
> at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
> ...
> Caused by: java.util.concurrent.TimeoutException: Timeout waiting for task.
> at 
> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:276)
> at 
> org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:96)
> at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:254)
> ... 38 more
> 19/03/15 11:21:38 WARN server.TransportChannelHandler: Exception in 
> connection from vc1033.halxg.cloudera.com/10.17.216.43:7337
> java.lang.IllegalArgumentException: Frame length should be positive: 
> -3702202170875367528
> at 
> org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
> {noformat}
> The IllegalArgumentException shouldn't happen, it only happens because the 
> code is ignoring the time out and retrying, at which point the remote side is 
> in a different state and thus doesn't expect the message.
> The same line that prints that exception can result in a noisy log message 
> when the remote side (e.g. an old shuffle service) does not understand the 
> new auth protocol. Since it's a warning it seems like something is wrong, 
> when it's just doing what's expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27220) Remove Yarn specific leftover from CoarseGrainedSchedulerBackend

2019-03-20 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797428#comment-16797428
 ] 

Ajith S commented on SPARK-27220:
-

# About making *currentExecutorIdCounter* datatype consistent, Yes, 
*currentExecutorIdCounter* is int initially in *CoarseGrainedSchedulerBackend*, 
but when it expects *RegisterExecutor* it expects String which makes it 
confusing.  ** Also *CoarseGrainedExecutorBackend* fires *RegisterExecutor* 
incase of yarn,mesos with executorId as String
 # About moving out *currentExecutorIdCounter* from 
*CoarseGrainedSchedulerBackend,* this i am unsure as 
*CoarseGrainedSchedulerBackend* is just offering a mechanism to maintain 
executor ids which yarn is just reusing (But  i see mesos ignores it completely 
and instead uses mesosTaskId, so makes sense of moving 
*currentExecutorIdCounter* out to yarn)

cc [~srowen] [~dongjoon] [~hyukjin.kwon] any thoughts.?

> Remove Yarn specific leftover from CoarseGrainedSchedulerBackend
> 
>
> Key: SPARK-27220
> URL: https://issues.apache.org/jira/browse/SPARK-27220
> Project: Spark
>  Issue Type: Task
>  Components: Spark Core, YARN
>Affects Versions: 2.0.2, 2.1.3, 2.2.3, 2.3.3, 2.4.0
>Reporter: Jacek Lewandowski
>Priority: Minor
>
> {{CoarseGrainedSchedulerBackend}} has the following field:
> {code:scala}
>   // The num of current max ExecutorId used to re-register appMaster
>   @volatile protected var currentExecutorIdCounter = 0
> {code}
> which is then updated:
> {code:scala}
>   case RegisterExecutor(executorId, executorRef, hostname, cores, 
> logUrls) =>
> ...
>   // This must be synchronized because variables mutated
>   // in this block are read when requesting executors
>   CoarseGrainedSchedulerBackend.this.synchronized {
> executorDataMap.put(executorId, data)
> if (currentExecutorIdCounter < executorId.toInt) {
>   currentExecutorIdCounter = executorId.toInt
> }
> ...
> {code}
> However it is never really used in {{CoarseGrainedSchedulerBackend}}. Its 
> only usage is in Yarn-specific code. It should be moved to Yarn then because 
> {{executorId}} is a {{String}} and there are really no guarantees that it is 
> always an integer. It was introduced in SPARK-12864



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27221) Improve the assert error message in TreeNode.parseToJson

2019-03-20 Thread Shixiong Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated SPARK-27221:
-
Summary: Improve the assert error message in TreeNode.parseToJson  (was: 
Improve the assert error message in TreeNode)

> Improve the assert error message in TreeNode.parseToJson
> 
>
> Key: SPARK-27221
> URL: https://issues.apache.org/jira/browse/SPARK-27221
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Minor
>
> When TreeNode.parseToJson may throw an assert error without any error message 
> when a TreeNode is not implemented properly, and it's hard to find the bad 
> TreeNode implementation.
> It's better to improve the error message to indicate the type of TreeNode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27221) Improve the assert error message in TreeNode

2019-03-20 Thread Shixiong Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated SPARK-27221:
-
Description: 
When TreeNode.parseToJson may throw an assert error without any error message 
when a TreeNode is not implemented properly, and it's hard to find the bad 
TreeNode implementation.

It's better to improve the error message to indicate the type of TreeNode.

> Improve the assert error message in TreeNode
> 
>
> Key: SPARK-27221
> URL: https://issues.apache.org/jira/browse/SPARK-27221
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Minor
>
> When TreeNode.parseToJson may throw an assert error without any error message 
> when a TreeNode is not implemented properly, and it's hard to find the bad 
> TreeNode implementation.
> It's better to improve the error message to indicate the type of TreeNode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27221) Improve the assert error message in TreeNode

2019-03-20 Thread Shixiong Zhu (JIRA)
Shixiong Zhu created SPARK-27221:


 Summary: Improve the assert error message in TreeNode
 Key: SPARK-27221
 URL: https://issues.apache.org/jira/browse/SPARK-27221
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Shixiong Zhu
Assignee: Shixiong Zhu






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27194) Job failures when task attempts do not clean up spark-staging parquet files

2019-03-20 Thread Reza Safi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reza Safi updated SPARK-27194:
--
Issue Type: Bug  (was: Improvement)

> Job failures when task attempts do not clean up spark-staging parquet files
> ---
>
> Key: SPARK-27194
> URL: https://issues.apache.org/jira/browse/SPARK-27194
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.3.1, 2.3.2
>Reporter: Reza Safi
>Priority: Major
>
> When a container fails for some reason (for example when killed by yarn for 
> exceeding memory limits), the subsequent task attempts for the tasks that 
> were running on that container all fail with a FileAlreadyExistsException. 
> The original task attempt does not seem to successfully call abortTask (or at 
> least its "best effort" delete is unsuccessful) and clean up the parquet file 
> it was writing to, so when later task attempts try to write to the same 
> spark-staging directory using the same file name, the job fails.
> Here is what transpires in the logs:
> The container where task 200.0 is running is killed and the task is lost:
> {code}
> 19/02/20 09:33:25 ERROR cluster.YarnClusterScheduler: Lost executor y on 
> t.y.z.com: Container killed by YARN for exceeding memory limits. 8.1 GB of 8 
> GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
>  19/02/20 09:33:25 WARN scheduler.TaskSetManager: Lost task 200.0 in stage 
> 0.0 (TID xxx, t.y.z.com, executor 93): ExecutorLostFailure (executor 93 
> exited caused by one of the running tasks) Reason: Container killed by YARN 
> for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider 
> boosting spark.yarn.executor.memoryOverhead.
> {code}
> The task is re-attempted on a different executor and fails because the 
> part-00200-blah-blah.c000.snappy.parquet file from the first task attempt 
> already exists:
> {code}
> 19/02/20 09:35:01 WARN scheduler.TaskSetManager: Lost task 200.1 in stage 0.0 
> (TID 594, tn.y.z.com, executor 70): org.apache.spark.SparkException: Task 
> failed while writing rows.
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:109)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>  Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet
>  for client a.b.c.d already exists
> {code}
> The job fails when the the configured task attempts (spark.task.maxFailures) 
> have failed with the same error:
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 200 
> in stage 0.0 failed 20 times, most recent failure: Lost task 284.19 in stage 
> 0.0 (TID yyy, tm.y.z.com, executor 16): org.apache.spark.SparkException: Task 
> failed while writing rows.
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
>  ...
>  Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet
>  for client i.p.a.d already exists
> {code}
> SPARK-26682 wasn't the root cause here, since there wasn't any stage 
> reattempt.
> This issue seems to happen when 
> spark.sql.sources.partitionOverwriteMode=dynamic. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27112) Spark Scheduler encounters two independent Deadlocks when trying to kill executors either due to dynamic allocation or blacklisting

2019-03-20 Thread Imran Rashid (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797321#comment-16797321
 ] 

Imran Rashid commented on SPARK-27112:
--

[~Dhruve Ashar] -- rc8 is already defined, there is nothing I (or anybody else) 
can do to change that.  I simply updated the jira to reflect that.  However, 
you might request that rc8 does not become 2.4.1, and instead we roll an rc9 
with this this fix.  You should respond to the VOTE thread for rc8 on the dev 
list with your concerns, that's the right forum for this (thanks for the 
reminder btw, I will mention it there as well).

> Spark Scheduler encounters two independent Deadlocks when trying to kill 
> executors either due to dynamic allocation or blacklisting 
> 
>
> Key: SPARK-27112
> URL: https://issues.apache.org/jira/browse/SPARK-27112
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Parth Gandhi
>Assignee: Parth Gandhi
>Priority: Major
> Fix For: 2.3.4, 2.4.2, 3.0.0
>
> Attachments: Screen Shot 2019-02-26 at 4.10.26 PM.png, Screen Shot 
> 2019-02-26 at 4.10.48 PM.png, Screen Shot 2019-02-26 at 4.11.11 PM.png, 
> Screen Shot 2019-02-26 at 4.11.26 PM.png
>
>
> Recently, a few spark users in the organization have reported that their jobs 
> were getting stuck. On further analysis, it was found out that there exist 
> two independent deadlocks and either of them occur under different 
> circumstances. The screenshots for these two deadlocks are attached here. 
> We were able to reproduce the deadlocks with the following piece of code:
>  
> {code:java}
> import org.apache.hadoop.conf.Configuration
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.spark._
> import org.apache.spark.TaskContext
> // Simple example of Word Count in Scala
> object ScalaWordCount {
> def main(args: Array[String]) {
> if (args.length < 2) {
> System.err.println("Usage: ScalaWordCount  ")
> System.exit(1)
> }
> val conf = new SparkConf().setAppName("Scala Word Count")
> val sc = new SparkContext(conf)
> // get the input file uri
> val inputFilesUri = args(0)
> // get the output file uri
> val outputFilesUri = args(1)
> while (true) {
> val textFile = sc.textFile(inputFilesUri)
> val counts = textFile.flatMap(line => line.split(" "))
> .map(word => {if (TaskContext.get.partitionId == 5 && 
> TaskContext.get.attemptNumber == 0) throw new Exception("Fail for 
> blacklisting") else (word, 1)})
> .reduceByKey(_ + _)
> counts.saveAsTextFile(outputFilesUri)
> val conf: Configuration = new Configuration()
> val path: Path = new Path(outputFilesUri)
> val hdfs: FileSystem = FileSystem.get(conf)
> hdfs.delete(path, true)
> }
> sc.stop()
> }
> }
> {code}
>  
> Additionally, to ensure that the deadlock surfaces up soon enough, I also 
> added a small delay in the Spark code here:
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala#L256]
>  
> {code:java}
> executorIdToFailureList.remove(exec)
> updateNextExpiryTime()
> Thread.sleep(2000)
> killBlacklistedExecutor(exec)
> {code}
>  
> Also make sure that the following configs are set when launching the above 
> spark job:
> *spark.blacklist.enabled=true*
> *spark.blacklist.killBlacklistedExecutors=true*
> *spark.blacklist.application.maxFailedTasksPerExecutor=1*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27220) Remove Yarn specific leftover from CoarseGrainedSchedulerBackend

2019-03-20 Thread Jacek Lewandowski (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacek Lewandowski updated SPARK-27220:
--
Description: 
{{CoarseGrainedSchedulerBackend}} has the following field:

{code:scala}
  // The num of current max ExecutorId used to re-register appMaster
  @volatile protected var currentExecutorIdCounter = 0
{code}

which is then updated:

{code:scala}
  case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) 
=>
...
  // This must be synchronized because variables mutated
  // in this block are read when requesting executors
  CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
  currentExecutorIdCounter = executorId.toInt
}
...
{code}

However it is never really used in {{CoarseGrainedSchedulerBackend}}. Its only 
usage is in Yarn-specific code. It should be moved to Yarn then because 
{{executorId}} is a {{String}} and there are really no guarantees that it is 
always an integer. It was introduced in SPARK-12864

  was:
{{CoarseGrainedSchedulerBackend}} has the following field:

{code:scala}
  // The num of current max ExecutorId used to re-register appMaster
  @volatile protected var currentExecutorIdCounter = 0
{code}

which is then updated:

{code:scala}
  case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) 
=>
...
  // This must be synchronized because variables mutated
  // in this block are read when requesting executors
  CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
  currentExecutorIdCounter = executorId.toInt
}
...
{code}

However it is never really used in {{CoarseGrainedSchedulerBackend}}. Its only 
usage is in Yarn-specific code. It should be moved to Yarn then because 
{{executorId}} is a {{String}} and there are really no guarantees that it is 
always an integer.



> Remove Yarn specific leftover from CoarseGrainedSchedulerBackend
> 
>
> Key: SPARK-27220
> URL: https://issues.apache.org/jira/browse/SPARK-27220
> Project: Spark
>  Issue Type: Task
>  Components: Spark Core, YARN
>Affects Versions: 2.0.2, 2.1.3, 2.2.3, 2.3.3, 2.4.0
>Reporter: Jacek Lewandowski
>Priority: Minor
>
> {{CoarseGrainedSchedulerBackend}} has the following field:
> {code:scala}
>   // The num of current max ExecutorId used to re-register appMaster
>   @volatile protected var currentExecutorIdCounter = 0
> {code}
> which is then updated:
> {code:scala}
>   case RegisterExecutor(executorId, executorRef, hostname, cores, 
> logUrls) =>
> ...
>   // This must be synchronized because variables mutated
>   // in this block are read when requesting executors
>   CoarseGrainedSchedulerBackend.this.synchronized {
> executorDataMap.put(executorId, data)
> if (currentExecutorIdCounter < executorId.toInt) {
>   currentExecutorIdCounter = executorId.toInt
> }
> ...
> {code}
> However it is never really used in {{CoarseGrainedSchedulerBackend}}. Its 
> only usage is in Yarn-specific code. It should be moved to Yarn then because 
> {{executorId}} is a {{String}} and there are really no guarantees that it is 
> always an integer. It was introduced in SPARK-12864



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27220) Remove Yarn specific leftover from CoarseGrainedSchedulerBackend

2019-03-20 Thread Jacek Lewandowski (JIRA)
Jacek Lewandowski created SPARK-27220:
-

 Summary: Remove Yarn specific leftover from 
CoarseGrainedSchedulerBackend
 Key: SPARK-27220
 URL: https://issues.apache.org/jira/browse/SPARK-27220
 Project: Spark
  Issue Type: Task
  Components: Spark Core, YARN
Affects Versions: 2.4.0, 2.3.3, 2.2.3, 2.1.3, 2.0.2
Reporter: Jacek Lewandowski


{{CoarseGrainedSchedulerBackend}} has the following field:

{code:scala}
  // The num of current max ExecutorId used to re-register appMaster
  @volatile protected var currentExecutorIdCounter = 0
{code}

which is then updated:

{code:scala}
  case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) 
=>
...
  // This must be synchronized because variables mutated
  // in this block are read when requesting executors
  CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
  currentExecutorIdCounter = executorId.toInt
}
...
{code}

However it is never really used in {{CoarseGrainedSchedulerBackend}}. Its only 
usage is in Yarn-specific code. It should be moved to Yarn then because 
{{executorId}} is a {{String}} and there are really no guarantees that it is 
always an integer.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27219) Misleading exceptions in transport code's SASL fallback path

2019-03-20 Thread Marcelo Vanzin (JIRA)
Marcelo Vanzin created SPARK-27219:
--

 Summary: Misleading exceptions in transport code's SASL fallback 
path
 Key: SPARK-27219
 URL: https://issues.apache.org/jira/browse/SPARK-27219
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Marcelo Vanzin


There are a couple of code paths in the SASL fallback handling that result in 
misleading exceptions printed to logs. One of them is if a timeout occurs 
during authentication; for example:

{noformat}
19/03/15 11:21:37 WARN crypto.AuthClientBootstrap: New auth protocol failed, 
trying SASL.
java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
waiting for task.
at 
org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
at 
org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:258)
at 
org.apache.spark.network.crypto.AuthClientBootstrap.doSparkAuth(AuthClientBootstrap.java:105)
at 
org.apache.spark.network.crypto.AuthClientBootstrap.doBootstrap(AuthClientBootstrap.java:79)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:262)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:192)
at 
org.apache.spark.network.shuffle.ExternalShuffleClient.lambda$fetchBlocks$0(ExternalShuffleClient.java:100)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
...
Caused by: java.util.concurrent.TimeoutException: Timeout waiting for task.
at 
org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:276)
at 
org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:96)
at 
org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:254)
... 38 more
19/03/15 11:21:38 WARN server.TransportChannelHandler: Exception in connection 
from vc1033.halxg.cloudera.com/10.17.216.43:7337
java.lang.IllegalArgumentException: Frame length should be positive: 
-3702202170875367528
at 
org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
{noformat}

The IllegalArgumentException shouldn't happen, it only happens because the code 
is ignoring the time out and retrying, at which point the remote side is in a 
different state and thus doesn't expect the message.

The same line that prints that exception can result in a noisy log message when 
the remote side (e.g. an old shuffle service) does not understand the new auth 
protocol. Since it's a warning it seems like something is wrong, when it's just 
doing what's expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27112) Spark Scheduler encounters two independent Deadlocks when trying to kill executors either due to dynamic allocation or blacklisting

2019-03-20 Thread Dhruve Ashar (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797253#comment-16797253
 ] 

Dhruve Ashar commented on SPARK-27112:
--

[~irashid] - I think this is a critical bug and since it is resolved we should 
include it in the rc8.

> Spark Scheduler encounters two independent Deadlocks when trying to kill 
> executors either due to dynamic allocation or blacklisting 
> 
>
> Key: SPARK-27112
> URL: https://issues.apache.org/jira/browse/SPARK-27112
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Parth Gandhi
>Assignee: Parth Gandhi
>Priority: Major
> Fix For: 2.3.4, 2.4.2, 3.0.0
>
> Attachments: Screen Shot 2019-02-26 at 4.10.26 PM.png, Screen Shot 
> 2019-02-26 at 4.10.48 PM.png, Screen Shot 2019-02-26 at 4.11.11 PM.png, 
> Screen Shot 2019-02-26 at 4.11.26 PM.png
>
>
> Recently, a few spark users in the organization have reported that their jobs 
> were getting stuck. On further analysis, it was found out that there exist 
> two independent deadlocks and either of them occur under different 
> circumstances. The screenshots for these two deadlocks are attached here. 
> We were able to reproduce the deadlocks with the following piece of code:
>  
> {code:java}
> import org.apache.hadoop.conf.Configuration
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.spark._
> import org.apache.spark.TaskContext
> // Simple example of Word Count in Scala
> object ScalaWordCount {
> def main(args: Array[String]) {
> if (args.length < 2) {
> System.err.println("Usage: ScalaWordCount  ")
> System.exit(1)
> }
> val conf = new SparkConf().setAppName("Scala Word Count")
> val sc = new SparkContext(conf)
> // get the input file uri
> val inputFilesUri = args(0)
> // get the output file uri
> val outputFilesUri = args(1)
> while (true) {
> val textFile = sc.textFile(inputFilesUri)
> val counts = textFile.flatMap(line => line.split(" "))
> .map(word => {if (TaskContext.get.partitionId == 5 && 
> TaskContext.get.attemptNumber == 0) throw new Exception("Fail for 
> blacklisting") else (word, 1)})
> .reduceByKey(_ + _)
> counts.saveAsTextFile(outputFilesUri)
> val conf: Configuration = new Configuration()
> val path: Path = new Path(outputFilesUri)
> val hdfs: FileSystem = FileSystem.get(conf)
> hdfs.delete(path, true)
> }
> sc.stop()
> }
> }
> {code}
>  
> Additionally, to ensure that the deadlock surfaces up soon enough, I also 
> added a small delay in the Spark code here:
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala#L256]
>  
> {code:java}
> executorIdToFailureList.remove(exec)
> updateNextExpiryTime()
> Thread.sleep(2000)
> killBlacklistedExecutor(exec)
> {code}
>  
> Also make sure that the following configs are set when launching the above 
> spark job:
> *spark.blacklist.enabled=true*
> *spark.blacklist.killBlacklistedExecutors=true*
> *spark.blacklist.application.maxFailedTasksPerExecutor=1*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27194) Job failures when task attempts do not clean up spark-staging parquet files

2019-03-20 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797241#comment-16797241
 ] 

Ajith S commented on SPARK-27194:
-

Hi [~dongjoon] , i have some analysis here : 
[https://github.com/apache/spark/pull/24142#issuecomment-474866759] Please let 
me know your views

> Job failures when task attempts do not clean up spark-staging parquet files
> ---
>
> Key: SPARK-27194
> URL: https://issues.apache.org/jira/browse/SPARK-27194
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, SQL
>Affects Versions: 2.3.1, 2.3.2
>Reporter: Reza Safi
>Priority: Major
>
> When a container fails for some reason (for example when killed by yarn for 
> exceeding memory limits), the subsequent task attempts for the tasks that 
> were running on that container all fail with a FileAlreadyExistsException. 
> The original task attempt does not seem to successfully call abortTask (or at 
> least its "best effort" delete is unsuccessful) and clean up the parquet file 
> it was writing to, so when later task attempts try to write to the same 
> spark-staging directory using the same file name, the job fails.
> Here is what transpires in the logs:
> The container where task 200.0 is running is killed and the task is lost:
> {code}
> 19/02/20 09:33:25 ERROR cluster.YarnClusterScheduler: Lost executor y on 
> t.y.z.com: Container killed by YARN for exceeding memory limits. 8.1 GB of 8 
> GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
>  19/02/20 09:33:25 WARN scheduler.TaskSetManager: Lost task 200.0 in stage 
> 0.0 (TID xxx, t.y.z.com, executor 93): ExecutorLostFailure (executor 93 
> exited caused by one of the running tasks) Reason: Container killed by YARN 
> for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider 
> boosting spark.yarn.executor.memoryOverhead.
> {code}
> The task is re-attempted on a different executor and fails because the 
> part-00200-blah-blah.c000.snappy.parquet file from the first task attempt 
> already exists:
> {code}
> 19/02/20 09:35:01 WARN scheduler.TaskSetManager: Lost task 200.1 in stage 0.0 
> (TID 594, tn.y.z.com, executor 70): org.apache.spark.SparkException: Task 
> failed while writing rows.
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:109)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>  Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet
>  for client a.b.c.d already exists
> {code}
> The job fails when the the configured task attempts (spark.task.maxFailures) 
> have failed with the same error:
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 200 
> in stage 0.0 failed 20 times, most recent failure: Lost task 284.19 in stage 
> 0.0 (TID yyy, tm.y.z.com, executor 16): org.apache.spark.SparkException: Task 
> failed while writing rows.
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
>  ...
>  Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet
>  for client i.p.a.d already exists
> {code}
> SPARK-26682 wasn't the root cause here, since there wasn't any stage 
> reattempt.
> This issue seems to happen when 
> spark.sql.sources.partitionOverwriteMode=dynamic. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27194) Job failures when task attempts do not clean up spark-staging parquet files

2019-03-20 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797216#comment-16797216
 ] 

Ajith S edited comment on SPARK-27194 at 3/20/19 2:38 PM:
--

[~dongjoon] Yes i tried with spark 2.3.3, and the issue persist. Here is the 
operation i performed
{code:java}
spark.sql.sources.partitionOverwriteMode=DYNAMIC{code}
{code:java}
create table t1 (i int, part1 int, part2 int) using parquet partitioned by 
(part1, part2)
insert into t1 partition(part1=1, part2=1) select 1
insert overwrite table t1 partition(part1=1, part2=1) select 2
insert overwrite table t1 partition(part1=2, part2) select 2, 2   // here the 
exec is killed and task respawns{code}
and here is the full stacktrace as per 2.3.3
{code:java}
2019-03-20 19:58:06 WARN TaskSetManager:66 - Lost task 0.1 in stage 2.0 (TID 3, 
QWERTY, executor 2): org.apache.spark.SparkException: Task failed while writing 
rows.
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
/user/hive/warehouse/t2/.spark-staging-1f1efbfd-7e20-4e0f-a49c-a7fa3eae4cb1/part1=2/part2=2/part-0-1f1efbfd-7e20-4e0f-a49c-a7fa3eae4cb1.c000.snappy.parquet
 for client 127.0.0.1 already exists
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2578)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2465)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2349)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:624)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:398)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at 
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1653)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
at 
org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:236)
at 
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
at 
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetOutputWriter.scala:37)
at 

[jira] [Commented] (SPARK-27194) Job failures when task attempts do not clean up spark-staging parquet files

2019-03-20 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797216#comment-16797216
 ] 

Ajith S commented on SPARK-27194:
-

[~dongjoon] Yes i tried with spark 2.3.3, and the issue persist. Here is the 
operation i performed
{code:java}
create table t1 (i int, part1 int, part2 int) using parquet partitioned by 
(part1, part2)
insert into t1 partition(part1=1, part2=1) select 1
insert overwrite table t1 partition(part1=1, part2=1) select 2
insert overwrite table t1 partition(part1=2, part2) select 2, 2   // here the 
exec is killed and task respawns{code}
and here is the full stacktrace as per 2.3.3
{code:java}
2019-03-20 19:58:06 WARN TaskSetManager:66 - Lost task 0.1 in stage 2.0 (TID 3, 
QWERTY, executor 2): org.apache.spark.SparkException: Task failed while writing 
rows.
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
/user/hive/warehouse/t2/.spark-staging-1f1efbfd-7e20-4e0f-a49c-a7fa3eae4cb1/part1=2/part2=2/part-0-1f1efbfd-7e20-4e0f-a49c-a7fa3eae4cb1.c000.snappy.parquet
 for client 127.0.0.1 already exists
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2578)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2465)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2349)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:624)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:398)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at 
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1653)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
at 
org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:236)
at 
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
at 
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetOutputWriter.scala:37)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
at 

[jira] [Created] (SPARK-27218) spark-sql-kafka-0-10 startingOffset=earliest not working as expected with streaming

2019-03-20 Thread Emanuele Sabellico (JIRA)
Emanuele Sabellico created SPARK-27218:
--

 Summary: spark-sql-kafka-0-10 startingOffset=earliest not working 
as expected with streaming
 Key: SPARK-27218
 URL: https://issues.apache.org/jira/browse/SPARK-27218
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.4.0
 Environment: Windows 10, spark-2.4.0-bin-hadoop2.7
Reporter: Emanuele Sabellico


Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 
with a code like this:
{noformat}
spark.readStream
.format("kafka")
.option("subscribe", topics)
.option("startingOffsets", "earliest")
.load()
.select(from_avro_with_schema_registry($"value", avroOptions) as 
"body"){noformat}
I find that Spark doesn't start from the earliest offset but from the latest. 
Or better, initially it gets the earliest offsets but then it does a seek to 
end, skipping the messages in between.
 In the logs I find this:
{noformat}
2019-03-20 11:59:50 INFO  KafkaMicroBatchReader:54 - Initial offsets: 
{"test1":{"0":1740}}
2019-03-20 11:59:50 INFO  Fetcher:583 - [Consumer clientId=consumer-1, 
groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0]
 Resetting offset for partition test1-0 to offset 15922.
{noformat}
Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ the 
method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is a 
_consumer.seekToEnd(partitions)_

According to the documentation I was expecting that the streaming started from 
the earliest offset in this case. Is there something that I'm getting wrong or 
doing wrong?

Thanks in advance!

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27218) spark-sql-kafka-0-10 startingOffset=earliest not working as expected with streaming

2019-03-20 Thread Emanuele Sabellico (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Emanuele Sabellico updated SPARK-27218:
---
Description: 
Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 
with a code like this:
{noformat}
spark.readStream
.format("kafka")
.option("subscribe", topics)
.option("startingOffsets", "earliest")
.load()
.select(from_avro_with_schema_registry($"value", avroOptions) as 
"body"){noformat}
I find that Spark doesn't start from the earliest offset but from the latest. 
Or better, initially it gets the earliest offsets but then it does a seek to 
end, skipping the messages in-between.
 In the logs I find this:
{noformat}
2019-03-20 11:59:50 INFO  KafkaMicroBatchReader:54 - Initial offsets: 
{"test1":{"0":1740}}
2019-03-20 11:59:50 INFO  Fetcher:583 - [Consumer clientId=consumer-1, 
groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0]
 Resetting offset for partition test1-0 to offset 15922.
{noformat}
Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ the 
method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is a 
_consumer.seekToEnd(partitions)_

According to the documentation I was expecting that the streaming would have 
started from the earliest offset in this case. Is there something that I'm 
getting wrong or doing wrong?

Thanks in advance!

 

  was:
Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 
with a code like this:
{noformat}
spark.readStream
.format("kafka")
.option("subscribe", topics)
.option("startingOffsets", "earliest")
.load()
.select(from_avro_with_schema_registry($"value", avroOptions) as 
"body"){noformat}
I find that Spark doesn't start from the earliest offset but from the latest. 
Or better, initially it gets the earliest offsets but then it does a seek to 
end, skipping the messages in-between.
 In the logs I find this:
{noformat}
2019-03-20 11:59:50 INFO  KafkaMicroBatchReader:54 - Initial offsets: 
{"test1":{"0":1740}}
2019-03-20 11:59:50 INFO  Fetcher:583 - [Consumer clientId=consumer-1, 
groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0]
 Resetting offset for partition test1-0 to offset 15922.
{noformat}
Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ the 
method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is a 
_consumer.seekToEnd(partitions)_

According to the documentation I was expecting that the streaming started from 
the earliest offset in this case. Is there something that I'm getting wrong or 
doing wrong?

Thanks in advance!

 


> spark-sql-kafka-0-10 startingOffset=earliest not working as expected with 
> streaming
> ---
>
> Key: SPARK-27218
> URL: https://issues.apache.org/jira/browse/SPARK-27218
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Windows 10, spark-2.4.0-bin-hadoop2.7
>Reporter: Emanuele Sabellico
>Priority: Minor
>
> Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 
> with a code like this:
> {noformat}
> spark.readStream
> .format("kafka")
> .option("subscribe", topics)
> .option("startingOffsets", "earliest")
> .load()
> .select(from_avro_with_schema_registry($"value", avroOptions) as 
> "body"){noformat}
> I find that Spark doesn't start from the earliest offset but from the latest. 
> Or better, initially it gets the earliest offsets but then it does a seek to 
> end, skipping the messages in-between.
>  In the logs I find this:
> {noformat}
> 2019-03-20 11:59:50 INFO  KafkaMicroBatchReader:54 - Initial offsets: 
> {"test1":{"0":1740}}
> 2019-03-20 11:59:50 INFO  Fetcher:583 - [Consumer clientId=consumer-1, 
> groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0]
>  Resetting offset for partition test1-0 to offset 15922.
> {noformat}
> Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ 
> the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is 
> a _consumer.seekToEnd(partitions)_
> According to the documentation I was expecting that the streaming would have 
> started from the earliest offset in this case. Is there something that I'm 
> getting wrong or doing wrong?
> Thanks in advance!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27218) spark-sql-kafka-0-10 startingOffset=earliest not working as expected with streaming

2019-03-20 Thread Emanuele Sabellico (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Emanuele Sabellico updated SPARK-27218:
---
Description: 
Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 
with a code like this:
{noformat}
spark.readStream
.format("kafka")
.option("subscribe", topics)
.option("startingOffsets", "earliest")
.load()
.select(from_avro_with_schema_registry($"value", avroOptions) as 
"body"){noformat}
I find that Spark doesn't start from the earliest offset but from the latest. 
Or better, initially it gets the earliest offsets but then it does a seek to 
end, skipping the messages in-between.
 In the logs I find this:
{noformat}
2019-03-20 11:59:50 INFO  KafkaMicroBatchReader:54 - Initial offsets: 
{"test1":{"0":1740}}
2019-03-20 11:59:50 INFO  Fetcher:583 - [Consumer clientId=consumer-1, 
groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0]
 Resetting offset for partition test1-0 to offset 15922.
{noformat}
Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ the 
method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is a 
_consumer.seekToEnd(partitions)_

According to the documentation I was expecting that the streaming started from 
the earliest offset in this case. Is there something that I'm getting wrong or 
doing wrong?

Thanks in advance!

 

  was:
Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 
with a code like this:
{noformat}
spark.readStream
.format("kafka")
.option("subscribe", topics)
.option("startingOffsets", "earliest")
.load()
.select(from_avro_with_schema_registry($"value", avroOptions) as 
"body"){noformat}
I find that Spark doesn't start from the earliest offset but from the latest. 
Or better, initially it gets the earliest offsets but then it does a seek to 
end, skipping the messages in between.
 In the logs I find this:
{noformat}
2019-03-20 11:59:50 INFO  KafkaMicroBatchReader:54 - Initial offsets: 
{"test1":{"0":1740}}
2019-03-20 11:59:50 INFO  Fetcher:583 - [Consumer clientId=consumer-1, 
groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0]
 Resetting offset for partition test1-0 to offset 15922.
{noformat}
Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ the 
method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is a 
_consumer.seekToEnd(partitions)_

According to the documentation I was expecting that the streaming started from 
the earliest offset in this case. Is there something that I'm getting wrong or 
doing wrong?

Thanks in advance!

 


> spark-sql-kafka-0-10 startingOffset=earliest not working as expected with 
> streaming
> ---
>
> Key: SPARK-27218
> URL: https://issues.apache.org/jira/browse/SPARK-27218
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Windows 10, spark-2.4.0-bin-hadoop2.7
>Reporter: Emanuele Sabellico
>Priority: Minor
>
> Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 
> with a code like this:
> {noformat}
> spark.readStream
> .format("kafka")
> .option("subscribe", topics)
> .option("startingOffsets", "earliest")
> .load()
> .select(from_avro_with_schema_registry($"value", avroOptions) as 
> "body"){noformat}
> I find that Spark doesn't start from the earliest offset but from the latest. 
> Or better, initially it gets the earliest offsets but then it does a seek to 
> end, skipping the messages in-between.
>  In the logs I find this:
> {noformat}
> 2019-03-20 11:59:50 INFO  KafkaMicroBatchReader:54 - Initial offsets: 
> {"test1":{"0":1740}}
> 2019-03-20 11:59:50 INFO  Fetcher:583 - [Consumer clientId=consumer-1, 
> groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0]
>  Resetting offset for partition test1-0 to offset 15922.
> {noformat}
> Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ 
> the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is 
> a _consumer.seekToEnd(partitions)_
> According to the documentation I was expecting that the streaming started 
> from the earliest offset in this case. Is there something that I'm getting 
> wrong or doing wrong?
> Thanks in advance!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27217) Nested schema pruning doesn't work for aggregation e.g. `sum`.

2019-03-20 Thread colin fang (JIRA)
colin fang created SPARK-27217:
--

 Summary: Nested schema pruning doesn't work for aggregation e.g. 
`sum`.
 Key: SPARK-27217
 URL: https://issues.apache.org/jira/browse/SPARK-27217
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: colin fang


Since SPARK-4502 is fixed,  I would expect queries such as `select sum(b.x)` 
doesn't have to read other nested fields.

{code:python}   
 rdd = spark.range(1000).rdd.map(lambda x: [x.id+3, [x.id+1, x.id-1]])
df = spark.createDataFrame(, schema='a:int,b:struct')
df.repartition(1).write.mode('overwrite').parquet('test.parquet')
df = spark.read.parquet('test.parquet')

spark.conf.set('spark.sql.optimizer.nestedSchemaPruning.enabled', 'true')
df.select('b.x').explain()
# ReadSchema: struct>

spark.conf.set('spark.sql.optimizer.nestedSchemaPruning.enabled', 'false')
df.select('b.x').explain()
# ReadSchema: struct>

spark.conf.set('spark.sql.optimizer.nestedSchemaPruning.enabled', 'true')
df.selectExpr('sum(b.x)').explain()
#  ReadSchema: struct>
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27206) Using slice method with streaming api's Interval on DStream

2019-03-20 Thread Gabor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi resolved SPARK-27206.
---
Resolution: Invalid

> Using slice method with streaming api's Interval on DStream
> ---
>
> Key: SPARK-27206
> URL: https://issues.apache.org/jira/browse/SPARK-27206
> Project: Spark
>  Issue Type: Question
>  Components: DStreams
>Affects Versions: 2.4.0
> Environment: Linux, standalone spark
>Reporter: Aarthi
>Priority: Major
>
> Hi. I am in need to slice a DStream that receives data from a custom receiver 
> (implemented with Spark's Receiver). There are two options to do this
> 1. slice(fromTime: 
> [Time|http://spark.apache.org/docs/2.3.1/api/scala/org/apache/spark/streaming/Time.html],
>  toTime: 
> [Time|http://spark.apache.org/docs/2.3.1/api/scala/org/apache/spark/streaming/Time.html])
> 2. slice(interval: Interval)
> Although the second option is a public method, the Interval class is private. 
> Can you please help me understand how to use this api with 
> org.apache.spark.streaming.Interval ?
> Thanks, Aarthi
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27216) Kryo serialization with RoaringBitmap

2019-03-20 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27216:


Assignee: (was: Apache Spark)

> Kryo serialization with RoaringBitmap
> -
>
> Key: SPARK-27216
> URL: https://issues.apache.org/jira/browse/SPARK-27216
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.3, 2.4.0, 3.0.0
>Reporter: Lantao Jin
>Priority: Major
>
> HighlyCompressedMapStatus uses RoaringBitmap to record the empty blocks. But 
> RoaringBitmap couldn't be ser/deser with unsafe KryoSerializer.
> We can use below UT to reproduce:
> {code}
>   test("kryo serialization with RoaringBitmap") {
> val bitmap = new RoaringBitmap
> bitmap.add(1787)
> val safeSer = new KryoSerializer(conf).newInstance()
> val bitmap2 : RoaringBitmap = 
> safeSer.deserialize(safeSer.serialize(bitmap))
> assert(bitmap2.equals(bitmap))
> conf.set("spark.kryo.unsafe", "true")
> val unsafeSer = new KryoSerializer(conf).newInstance()
> val bitmap3 : RoaringBitmap = 
> unsafeSer.deserialize(unsafeSer.serialize(bitmap))
> assert(bitmap3.equals(bitmap)) // this will fail
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27216) Kryo serialization with RoaringBitmap

2019-03-20 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27216:


Assignee: Apache Spark

> Kryo serialization with RoaringBitmap
> -
>
> Key: SPARK-27216
> URL: https://issues.apache.org/jira/browse/SPARK-27216
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.3, 2.4.0, 3.0.0
>Reporter: Lantao Jin
>Assignee: Apache Spark
>Priority: Major
>
> HighlyCompressedMapStatus uses RoaringBitmap to record the empty blocks. But 
> RoaringBitmap couldn't be ser/deser with unsafe KryoSerializer.
> We can use below UT to reproduce:
> {code}
>   test("kryo serialization with RoaringBitmap") {
> val bitmap = new RoaringBitmap
> bitmap.add(1787)
> val safeSer = new KryoSerializer(conf).newInstance()
> val bitmap2 : RoaringBitmap = 
> safeSer.deserialize(safeSer.serialize(bitmap))
> assert(bitmap2.equals(bitmap))
> conf.set("spark.kryo.unsafe", "true")
> val unsafeSer = new KryoSerializer(conf).newInstance()
> val bitmap3 : RoaringBitmap = 
> unsafeSer.deserialize(unsafeSer.serialize(bitmap))
> assert(bitmap3.equals(bitmap)) // this will fail
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27206) Using slice method with streaming api's Interval on DStream

2019-03-20 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797153#comment-16797153
 ] 

Gabor Somogyi commented on SPARK-27206:
---

[~aarthipa] Can you please write a mail to the dev list? (d...@spark.apache.org)


> Using slice method with streaming api's Interval on DStream
> ---
>
> Key: SPARK-27206
> URL: https://issues.apache.org/jira/browse/SPARK-27206
> Project: Spark
>  Issue Type: Question
>  Components: DStreams
>Affects Versions: 2.4.0
> Environment: Linux, standalone spark
>Reporter: Aarthi
>Priority: Major
>
> Hi. I am in need to slice a DStream that receives data from a custom receiver 
> (implemented with Spark's Receiver). There are two options to do this
> 1. slice(fromTime: 
> [Time|http://spark.apache.org/docs/2.3.1/api/scala/org/apache/spark/streaming/Time.html],
>  toTime: 
> [Time|http://spark.apache.org/docs/2.3.1/api/scala/org/apache/spark/streaming/Time.html])
> 2. slice(interval: Interval)
> Although the second option is a public method, the Interval class is private. 
> Can you please help me understand how to use this api with 
> org.apache.spark.streaming.Interval ?
> Thanks, Aarthi
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27216) Kryo serialization with RoaringBitmap

2019-03-20 Thread Lantao Jin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lantao Jin updated SPARK-27216:
---
Description: 
HighlyCompressedMapStatus uses RoaringBitmap to record the empty blocks. But 
RoaringBitmap couldn't be ser/deser with unsafe KryoSerializer.

We can use below UT to reproduce:
{code}
  test("kryo serialization with RoaringBitmap") {
val bitmap = new RoaringBitmap
bitmap.add(1787)

val safeSer = new KryoSerializer(conf).newInstance()
val bitmap2 : RoaringBitmap = safeSer.deserialize(safeSer.serialize(bitmap))
assert(bitmap2.equals(bitmap))

conf.set("spark.kryo.unsafe", "true")
val unsafeSer = new KryoSerializer(conf).newInstance()
val bitmap3 : RoaringBitmap = 
unsafeSer.deserialize(unsafeSer.serialize(bitmap))
assert(bitmap3.equals(bitmap)) // this will fail
  }
{code}

  was:HighlyCompressedMapStatus uses RoaringBitmap to record the empty blocks. 
But RoaringBitmap couldn't be ser/deser with unsafe KryoSerializer.


> Kryo serialization with RoaringBitmap
> -
>
> Key: SPARK-27216
> URL: https://issues.apache.org/jira/browse/SPARK-27216
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.3, 2.4.0, 3.0.0
>Reporter: Lantao Jin
>Priority: Major
>
> HighlyCompressedMapStatus uses RoaringBitmap to record the empty blocks. But 
> RoaringBitmap couldn't be ser/deser with unsafe KryoSerializer.
> We can use below UT to reproduce:
> {code}
>   test("kryo serialization with RoaringBitmap") {
> val bitmap = new RoaringBitmap
> bitmap.add(1787)
> val safeSer = new KryoSerializer(conf).newInstance()
> val bitmap2 : RoaringBitmap = 
> safeSer.deserialize(safeSer.serialize(bitmap))
> assert(bitmap2.equals(bitmap))
> conf.set("spark.kryo.unsafe", "true")
> val unsafeSer = new KryoSerializer(conf).newInstance()
> val bitmap3 : RoaringBitmap = 
> unsafeSer.deserialize(unsafeSer.serialize(bitmap))
> assert(bitmap3.equals(bitmap)) // this will fail
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27216) Kryo serialization with RoaringBitmap

2019-03-20 Thread Lantao Jin (JIRA)
Lantao Jin created SPARK-27216:
--

 Summary: Kryo serialization with RoaringBitmap
 Key: SPARK-27216
 URL: https://issues.apache.org/jira/browse/SPARK-27216
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0, 2.3.3, 3.0.0
Reporter: Lantao Jin


HighlyCompressedMapStatus uses RoaringBitmap to record the empty blocks. But 
RoaringBitmap couldn't be ser/deser with unsafe KryoSerializer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27215) Correct the kryo configurations

2019-03-20 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27215:


Assignee: (was: Apache Spark)

> Correct the kryo configurations
> ---
>
> Key: SPARK-27215
> URL: https://issues.apache.org/jira/browse/SPARK-27215
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Lantao Jin
>Priority: Major
>
> {code}
>   val KRYO_USE_UNSAFE = ConfigBuilder("spark.kyro.unsafe")
> .booleanConf
> .createWithDefault(false)
>   val KRYO_USE_POOL = ConfigBuilder("spark.kyro.pool")
> .booleanConf
> .createWithDefault(true)
> {code}
> kyro should be kryo



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27215) Correct the kryo configurations

2019-03-20 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27215:


Assignee: Apache Spark

> Correct the kryo configurations
> ---
>
> Key: SPARK-27215
> URL: https://issues.apache.org/jira/browse/SPARK-27215
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Lantao Jin
>Assignee: Apache Spark
>Priority: Major
>
> {code}
>   val KRYO_USE_UNSAFE = ConfigBuilder("spark.kyro.unsafe")
> .booleanConf
> .createWithDefault(false)
>   val KRYO_USE_POOL = ConfigBuilder("spark.kyro.pool")
> .booleanConf
> .createWithDefault(true)
> {code}
> kyro should be kryo



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27215) Correct the kryo configurations

2019-03-20 Thread Lantao Jin (JIRA)
Lantao Jin created SPARK-27215:
--

 Summary: Correct the kryo configurations
 Key: SPARK-27215
 URL: https://issues.apache.org/jira/browse/SPARK-27215
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Lantao Jin


{code}
  val KRYO_USE_UNSAFE = ConfigBuilder("spark.kyro.unsafe")
.booleanConf
.createWithDefault(false)

  val KRYO_USE_POOL = ConfigBuilder("spark.kyro.pool")
.booleanConf
.createWithDefault(true)
{code}
kyro should be kryo



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27199) Replace TimeZone by ZoneId in TimestampFormatter API

2019-03-20 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27199.
--
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 24141
[https://github.com/apache/spark/pull/24141]

> Replace TimeZone by ZoneId in TimestampFormatter API
> 
>
> Key: SPARK-27199
> URL: https://issues.apache.org/jira/browse/SPARK-27199
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Maxim Gekk
>Assignee: Maxim Gekk
>Priority: Minor
> Fix For: 3.0.0
>
>
> Internally, TimestampFormatter implementations use ZoneId but not TimeZone 
> which comes via API. Conversion from TimeZone to ZoneId is not for free. 
> Actually, TimeZone is converted to String, and the String and parsed to 
> ZoneId. The conversion to String can be eliminated if TimestampFormatter 
> would accept ZoneId. And also, TimeZone is converted from String in some 
> cases (JSON options). So, in bad case String -> TimeZone -> String -> ZoneId 
> -> ZoneOffset. The ticket aims to use ZoneId in TimestampFormatter API. We 
> could require ZoneOffset but it is not convenient in most cases because 
> conversion ZoneId to ZoneOffset requires Instant. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27201) Show full job description on click

2019-03-20 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-27201:


Assignee: Gengliang Wang

> Show full job description on click
> --
>
> Key: SPARK-27201
> URL: https://issues.apache.org/jira/browse/SPARK-27201
> Project: Spark
>  Issue Type: Task
>  Components: Web UI
>Affects Versions: 3.0.0
>Reporter: Gengliang Wang
>Assignee: Gengliang Wang
>Priority: Major
>
> Previously, in https://github.com/apache/spark/pull/6646 there was an 
> improvement to show full job description after double click.
> I think this is a bit hard to be noticed by some users. I suggest changing 
> the event to one click.
> Also, after the full description is shown, another click should be able to 
> hide the overflow text again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27201) Show full job description on click

2019-03-20 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27201.
--
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 24145
[https://github.com/apache/spark/pull/24145]

> Show full job description on click
> --
>
> Key: SPARK-27201
> URL: https://issues.apache.org/jira/browse/SPARK-27201
> Project: Spark
>  Issue Type: Task
>  Components: Web UI
>Affects Versions: 3.0.0
>Reporter: Gengliang Wang
>Assignee: Gengliang Wang
>Priority: Major
> Fix For: 3.0.0
>
>
> Previously, in https://github.com/apache/spark/pull/6646 there was an 
> improvement to show full job description after double click.
> I think this is a bit hard to be noticed by some users. I suggest changing 
> the event to one click.
> Also, after the full description is shown, another click should be able to 
> hide the overflow text again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27213) Unexpected results when filter is used after distinct

2019-03-20 Thread Udbhav Agrawal (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797112#comment-16797112
 ] 

Udbhav Agrawal commented on SPARK-27213:


i will try to check this issue

> Unexpected results when filter is used after distinct
> -
>
> Key: SPARK-27213
> URL: https://issues.apache.org/jira/browse/SPARK-27213
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Rinaz Belhaj
>Priority: Major
>  Labels: distinct, filter
>
> The following code gives unexpected output due to the filter not getting 
> pushed down in catalyst optimizer.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
> {panel:title=Output}
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
>  
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
>  
> |x|y|z|
> |a|123|12.4|
> {panel}
> Ideally, the second statement should result in an error since the column used 
> in the filter is not present in the preceding select statement. But the 
> catalyst optimizer is using first() on column y_n and then applying the 
> filter.
> Even if the filter was pushed down, the result would have been accurate.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
> df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
> {code}
> {panel:title=Output}
>  
>  == Parsed Logical Plan ==
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- AnalysisBarrier
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
> z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>   
>  
> ---
>  
>   
>  == Parsed Logical Plan ==
>  'Filter ('y_n = y)
>  +- AnalysisBarrier
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
> first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS 
> y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
> y_n#77|#74, y#75, z#76, y_n#77])
>  +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], 
> false, 0
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, 
> z#76, 

[jira] [Assigned] (SPARK-27199) Replace TimeZone by ZoneId in TimestampFormatter API

2019-03-20 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-27199:


Assignee: Maxim Gekk

> Replace TimeZone by ZoneId in TimestampFormatter API
> 
>
> Key: SPARK-27199
> URL: https://issues.apache.org/jira/browse/SPARK-27199
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Maxim Gekk
>Assignee: Maxim Gekk
>Priority: Minor
>
> Internally, TimestampFormatter implementations use ZoneId but not TimeZone 
> which comes via API. Conversion from TimeZone to ZoneId is not for free. 
> Actually, TimeZone is converted to String, and the String and parsed to 
> ZoneId. The conversion to String can be eliminated if TimestampFormatter 
> would accept ZoneId. And also, TimeZone is converted from String in some 
> cases (JSON options). So, in bad case String -> TimeZone -> String -> ZoneId 
> -> ZoneOffset. The ticket aims to use ZoneId in TimestampFormatter API. We 
> could require ZoneOffset but it is not convenient in most cases because 
> conversion ZoneId to ZoneOffset requires Instant. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27089) Loss of precision during decimal division

2019-03-20 Thread Jared Leable (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797111#comment-16797111
 ] 

Jared Leable commented on SPARK-27089:
--

I validated that setting spark.sql.decimalOperations.allowPrecisionLoss=false 
returns the expected value.

> Loss of precision during decimal division
> -
>
> Key: SPARK-27089
> URL: https://issues.apache.org/jira/browse/SPARK-27089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: ylo0ztlmtusq
>Priority: Major
>
> Spark looses decimal places when dividing decimal numbers.
>  
> Expected behavior (In Spark 2.2.3 or before)
>  
> {code:java}
> scala> val sql = """select cast(cast(3 as decimal(38,14)) / cast(9 as 
> decimal(38,14)) as decimal(38,14)) val"""
> sql: String = select cast(cast(3 as decimal(38,14)) / cast(9 as 
> decimal(38,14)) as decimal(38,14)) val
> scala> spark.sql(sql).show
> 19/03/07 21:23:51 WARN ObjectStore: Failed to get database global_temp, 
> returning NoSuchObjectException
> ++
> | val|
> ++
> |0.33|
> ++
> {code}
>  
> Current behavior (In Spark 2.3.2 and later)
>  
> {code:java}
> scala> val sql = """select cast(cast(3 as decimal(38,14)) / cast(9 as 
> decimal(38,14)) as decimal(38,14)) val"""
> sql: String = select cast(cast(3 as decimal(38,14)) / cast(9 as 
> decimal(38,14)) as decimal(38,14)) val
> scala> spark.sql(sql).show
> ++
> | val|
> ++
> |0.33|
> ++
> {code}
>  
> Seems to caused by {{promote_precision(38, 6) }}
>  
> {code:java}
> scala> spark.sql(sql).explain(true)
> == Parsed Logical Plan ==
> Project [cast((cast(3 as decimal(38,14)) / cast(9 as decimal(38,14))) as 
> decimal(38,14)) AS val#20]
> +- OneRowRelation
> == Analyzed Logical Plan ==
> val: decimal(38,14)
> Project [cast(CheckOverflow((promote_precision(cast(cast(3 as decimal(38,14)) 
> as decimal(38,14))) / promote_precision(cast(cast(9 as decimal(38,14)) as 
> decimal(38,14, DecimalType(38,6)) as decimal(38,14)) AS val#20]
> +- OneRowRelation
> == Optimized Logical Plan ==
> Project [0.33 AS val#20]
> +- OneRowRelation
> == Physical Plan ==
> *(1) Project [0.33 AS val#20]
> +- Scan OneRowRelation[]
> {code}
>  
> Source https://stackoverflow.com/q/55046492



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2019-03-20 Thread Rinaz Belhaj (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinaz Belhaj updated SPARK-27213:
-
Priority: Major  (was: Minor)

> Unexpected results when filter is used after distinct
> -
>
> Key: SPARK-27213
> URL: https://issues.apache.org/jira/browse/SPARK-27213
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Rinaz Belhaj
>Priority: Major
>  Labels: distinct, filter
>
> The following code gives unexpected output due to the filter not getting 
> pushed down in catalyst optimizer.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
> {panel:title=Output}
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
>  
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
>  
> |x|y|z|
> |a|123|12.4|
> {panel}
> Ideally, the second statement should result in an error since the column used 
> in the filter is not present in the preceding select statement. But the 
> catalyst optimizer is using first() on column y_n and then applying the 
> filter.
> Even if the filter was pushed down, the result would have been accurate.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
> df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
> {code}
> {panel:title=Output}
>  
>  == Parsed Logical Plan ==
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- AnalysisBarrier
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
> z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>   
>  
> ---
>  
>   
>  == Parsed Logical Plan ==
>  'Filter ('y_n = y)
>  +- AnalysisBarrier
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
> first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS 
> y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
> y_n#77|#74, y#75, z#76, y_n#77])
>  +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], 
> false, 0
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, 
> z#76, first#95, valueSet#96|#74, y#75, z#76, 

[jira] [Commented] (SPARK-27169) number of active tasks is negative on executors page

2019-03-20 Thread acupple (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797070#comment-16797070
 ] 

acupple commented on SPARK-27169:
-

Thanks for you suggestion, and I will try increment the queue size and 
reproduce the case。

> number of active tasks is negative on executors page
> 
>
> Key: SPARK-27169
> URL: https://issues.apache.org/jira/browse/SPARK-27169
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.3.2
>Reporter: acupple
>Priority: Minor
> Attachments: QQ20190315-102215.png, QQ20190315-102235.png, 
> image-2019-03-19-15-17-25-522.png, image-2019-03-19-15-21-03-766.png, 
> job_1924.log, stage_3511.log
>
>
> I use spark to process some data in HDFS and HBASE, I use one thread consume 
> message from a queue, and then submit to a thread pool(16 fix size)for spark 
> processor.
> But when run for some time, the active jobs will be thousands, and number of 
> active tasks are negative.
> Actually, these jobs are already done when I check driver logs。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27100) dag-scheduler-event-loop" java.lang.StackOverflowError

2019-03-20 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797079#comment-16797079
 ] 

Hyukjin Kwon commented on SPARK-27100:
--

Would you be interested in narrowing down the problem, so that people can test 
it against the master branch?

> dag-scheduler-event-loop" java.lang.StackOverflowError
> --
>
> Key: SPARK-27100
> URL: https://issues.apache.org/jira/browse/SPARK-27100
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 2.1.3, 2.3.3
>Reporter: KaiXu
>Priority: Major
> Attachments: stderr
>
>
> ALS in Spark MLlib causes StackOverflow:
>  /opt/sparkml/spark213/bin/spark-submit  --properties-file 
> /opt/HiBench/report/als/spark/conf/sparkbench/spark.conf --class 
> com.intel.hibench.sparkbench.ml.ALSExample --master yarn-client 
> --num-executors 3 --executor-memory 322g 
> /opt/HiBench/sparkbench/assembly/target/sparkbench-assembly-7.1-SNAPSHOT-dist.jar
>  --numUsers 4 --numProducts 6 --rank 100 --numRecommends 20 
> --numIterations 100 --kryo false --implicitPrefs true --numProductBlocks -1 
> --numUserBlocks -1 --lambda 1.0 hdfs://bdw-slave20:8020/HiBench/ALS/Input
>  
> Exception in thread "dag-scheduler-event-loop" java.lang.StackOverflowError
>  at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1534)
>  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>  at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>  at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>  at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>  at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>  at 
> scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
>  at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>  at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>  at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>  at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>  at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>  at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>  at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>  at 
> scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
>  at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>  at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>  at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>  at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>  at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27214) Upgrading locality level when lots of pending tasks have been waiting more than locality.wait

2019-03-20 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-27214:
-
Description: 
Currently, Spark locality wait mechanism is not friendly for large job, when 
number of tasks is large(e.g. 1+)and with a large number of executors(e.g. 
2000), executors may be launched on some nodes  where the locality is not the 
best(not the same nodes hold HDFS blocks). There are cases when 
`TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within 
`spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` 
seconds a task is finished), so locality level would not be upgraded and lots 
of pending tasks will wait a long time. 

In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
executors may be removed by Driver due to become idle and finally slow down the 
job.

Actually, we can optimize this by following formula:

Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
probabilityOfNextLocalitySchedule=0.5

```

maxStarvingTasks = numPendingTasks * medianOfTaskExecutionTime * 
localityExecutionGainFactor * probabilityOfNextLocalitySchedule / 
`spark.locality.wait`

if (numStavingTasks > maxStarvingTasks)

{  upgrading locality level... }



```

  was:
Currently, Spark locality wait mechanism is not friendly for large job, when 
tasks is large(e.g. 1+), there are cases when 
`TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within 
`spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` 
seconds a task is finished), so locality level would not be upgraded and lots 
of pending tasks will wait a long time. 

In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
executors may be removed by Driver due to become idle and finally slow down the 
job.

Actually, we can optimize this by following formula:

Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
probabilityOfNextLocalitySchedule=0.5

```

maxStarvingTasks = numPendingTasks * medianOfTaskExecutionTime * 
localityExecutionGainFactor * probabilityOfNextLocalitySchedule / 
`spark.locality.wait`

if (numStavingTasks > maxStarvingTasks) {

 upgrading locality level...

}



```


> Upgrading locality level when lots of pending tasks have been waiting more 
> than locality.wait
> -
>
> Key: SPARK-27214
> URL: https://issues.apache.org/jira/browse/SPARK-27214
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, Spark locality wait mechanism is not friendly for large job, when 
> number of tasks is large(e.g. 1+)and with a large number of 
> executors(e.g. 2000), executors may be launched on some nodes  where the 
> locality is not the best(not the same nodes hold HDFS blocks). There are 
> cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks 
> within `spark.locality.wait` but coming at low rate(e.g. every 
> `spark.locality.wait` seconds a task is finished), so locality level would 
> not be upgraded and lots of pending tasks will wait a long time. 
> In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
> executors may be removed by Driver due to become idle and finally slow down 
> the job.
> Actually, we can optimize this by following formula:
> Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
> probabilityOfNextLocalitySchedule=0.5
> ```
> maxStarvingTasks = numPendingTasks * medianOfTaskExecutionTime * 
> localityExecutionGainFactor * probabilityOfNextLocalitySchedule / 
> `spark.locality.wait`
> if (numStavingTasks > maxStarvingTasks)
> {  upgrading locality level... }
> 
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27169) number of active tasks is negative on executors page

2019-03-20 Thread acupple (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797059#comment-16797059
 ] 

acupple commented on SPARK-27169:
-

Can not find any "Dropping event" log,  but some warn that "Dropped events from 
appStatus"

> number of active tasks is negative on executors page
> 
>
> Key: SPARK-27169
> URL: https://issues.apache.org/jira/browse/SPARK-27169
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.3.2
>Reporter: acupple
>Priority: Minor
> Attachments: QQ20190315-102215.png, QQ20190315-102235.png, 
> image-2019-03-19-15-17-25-522.png, image-2019-03-19-15-21-03-766.png, 
> job_1924.log, stage_3511.log
>
>
> I use spark to process some data in HDFS and HBASE, I use one thread consume 
> message from a queue, and then submit to a thread pool(16 fix size)for spark 
> processor.
> But when run for some time, the active jobs will be thousands, and number of 
> active tasks are negative.
> Actually, these jobs are already done when I check driver logs。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27214) Upgrading locality level when lots of pending tasks have been waiting more than locality.wait

2019-03-20 Thread liupengcheng (JIRA)
liupengcheng created SPARK-27214:


 Summary: Upgrading locality level when lots of pending tasks have 
been waiting more than locality.wait
 Key: SPARK-27214
 URL: https://issues.apache.org/jira/browse/SPARK-27214
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0, 2.1.0
Reporter: liupengcheng


Currently, Spark locality wait mechanism is not friendly for large job, when 
tasks is large(e.g. 1+), there are cases when 
`TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within 
`spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` 
seconds a task is finished), so locality level would not be upgraded and lots 
of pending tasks will wait a long time. 

In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
executors may be removed by Driver due to become idle and finally slow down the 
job.

Actually, we can optimize this by following formula:

Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
probabilityOfNextLocalitySchedule=0.5

```

maxStarvingTasks = numPendingTasks * medianOfTaskExecutionTime * 
localityExecutionGainFactor * probabilityOfNextLocalitySchedule / 
`spark.locality.wait`

if (numStavingTasks > maxStarvingTasks) {

 upgrading locality level...

}



```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27169) number of active tasks is negative on executors page

2019-03-20 Thread shahid (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797066#comment-16797066
 ] 

shahid commented on SPARK-27169:


Yes. that means many event drops happens. Can you try increasing the queue 
size, "spark.scheduler.listenerbus.eventqueue.capacity" (default 1) might 
helps. If event drop happens, then UI display weirdly only, I'm not sure, from 
the UI side we can do anything.

Do you have any reproducible steps for that, so that I can try?

> number of active tasks is negative on executors page
> 
>
> Key: SPARK-27169
> URL: https://issues.apache.org/jira/browse/SPARK-27169
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.3.2
>Reporter: acupple
>Priority: Minor
> Attachments: QQ20190315-102215.png, QQ20190315-102235.png, 
> image-2019-03-19-15-17-25-522.png, image-2019-03-19-15-21-03-766.png, 
> job_1924.log, stage_3511.log
>
>
> I use spark to process some data in HDFS and HBASE, I use one thread consume 
> message from a queue, and then submit to a thread pool(16 fix size)for spark 
> processor.
> But when run for some time, the active jobs will be thousands, and number of 
> active tasks are negative.
> Actually, these jobs are already done when I check driver logs。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27169) number of active tasks is negative on executors page

2019-03-20 Thread shahid (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797066#comment-16797066
 ] 

shahid edited comment on SPARK-27169 at 3/20/19 11:26 AM:
--

Yes. that means many event drops happens. Can you try increasing the queue 
size, "spark.scheduler.listenerbus.eventqueue.capacity" (default 1) might 
helps. If event drop happens, then UI display weirdly, I'm not sure, from the 
UI side we can do anything.

Do you have any reproducible steps for that, so that I can try?


was (Author: shahid):
Yes. that means many event drops happens. Can you try increasing the queue 
size, "spark.scheduler.listenerbus.eventqueue.capacity" (default 1) might 
helps. If event drop happens, then UI display weirdly only, I'm not sure, from 
the UI side we can do anything.

Do you have any reproducible steps for that, so that I can try?

> number of active tasks is negative on executors page
> 
>
> Key: SPARK-27169
> URL: https://issues.apache.org/jira/browse/SPARK-27169
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.3.2
>Reporter: acupple
>Priority: Minor
> Attachments: QQ20190315-102215.png, QQ20190315-102235.png, 
> image-2019-03-19-15-17-25-522.png, image-2019-03-19-15-21-03-766.png, 
> job_1924.log, stage_3511.log
>
>
> I use spark to process some data in HDFS and HBASE, I use one thread consume 
> message from a queue, and then submit to a thread pool(16 fix size)for spark 
> processor.
> But when run for some time, the active jobs will be thousands, and number of 
> active tasks are negative.
> Actually, these jobs are already done when I check driver logs。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2019-03-20 Thread Rinaz Belhaj (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinaz Belhaj updated SPARK-27213:
-
Description: 
The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)
df.filter("y_n='y'").select('x','y','z').distinct().show()
df.select('x','y','z').distinct().filter("y_n='y'").show()
{code}
{panel:title=Output}
|x|y|z|y_n|
|a|123|12.3|n|
|a|123|12.3|y|
|a|123|12.4|y|

 
|x|y|z|
|a|123|12.3|
|a|123|12.4|

 
|x|y|z|
|a|123|12.4|
{panel}
Ideally, the second statement should result in an error since the column used 
in the filter is not present in the preceding select statement. But the 
catalyst optimizer is using first() on column y_n and then applying the filter.

Even if the filter was pushed down, the result would have been accurate.
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
{code}
{panel:title=Output}
 
 == Parsed Logical Plan ==
 Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- AnalysisBarrier
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Analyzed Logical Plan ==
 x: string, y: string, z: string
 Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Optimized Logical Plan ==
 Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Physical Plan ==
 *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
output=[x#74, y#75, z#76|#74, y#75, z#76])
 +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
 +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
output=[x#74, y#75, z#76|#74, y#75, z#76])
 +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
  
 
---
 
  
 == Parsed Logical Plan ==
 'Filter ('y_n = y)
 +- AnalysisBarrier
 +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Analyzed Logical Plan ==
 x: string, y: string, z: string
 Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Optimized Logical Plan ==
 Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS y_n#77]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Physical Plan ==
 *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
y_n#77|#74, y#75, z#76, y_n#77])
 +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
 +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96])
 +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0
 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
   
{panel}
The second query. ie 
*"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should 
result in error rather than giving wrong output.

  was:
The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)

[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2019-03-20 Thread Rinaz Belhaj (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinaz Belhaj updated SPARK-27213:
-
Description: 
The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)
df.filter("y_n='y'").select('x','y','z').distinct().show()
df.select('x','y','z').distinct().filter("y_n='y'").show()
{code}
 

 
{panel:title=Output}
|x|y|z|y_n|
|a|123|12.3|n|
|a|123|12.3|y|
|a|123|12.4|y|

 
|x|y|z|
|a|123|12.3|
|a|123|12.4|

 
|x|y|z|
|a|123|12.4|
{panel}
 

Ideally, the second statement should result in an error since the column used 
in the filter is not present in the preceding select statement. But the 
catalyst optimizer is using first() on column y_n and then applying the filter.

Even if the filter was pushed down, the result would have been accurate.

 

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
{code}
{panel:title=Output}
 
 == Parsed Logical Plan ==
 Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- AnalysisBarrier
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Analyzed Logical Plan ==
 x: string, y: string, z: string
 Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Optimized Logical Plan ==
 Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Physical Plan ==
 *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
output=[x#74, y#75, z#76|#74, y#75, z#76])
 +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
 +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
output=[x#74, y#75, z#76|#74, y#75, z#76])
 +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
  
 
---
 
  
 == Parsed Logical Plan ==
 'Filter ('y_n = y)
 +- AnalysisBarrier
 +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Analyzed Logical Plan ==
 x: string, y: string, z: string
 Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Optimized Logical Plan ==
 Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS y_n#77]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Physical Plan ==
 *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
y_n#77|#74, y#75, z#76, y_n#77])
 +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
 +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96])
 +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0
 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
  
{panel}
The second query. ie 
*"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should 
result in error rather than giving wrong output.

  was:
The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)

[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2019-03-20 Thread Rinaz Belhaj (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinaz Belhaj updated SPARK-27213:
-
Description: 
The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)
df.filter("y_n='y'").select('x','y','z').distinct().show()
df.select('x','y','z').distinct().filter("y_n='y'").show()
{code}
{panel:title=Output}
|x|y|z|y_n|
|a|123|12.3|n|
|a|123|12.3|y|
|a|123|12.4|y|

 
|x|y|z|
|a|123|12.3|
|a|123|12.4|

 
|x|y|z|
|a|123|12.4|
{panel}
 

Ideally, the second statement should result in an error since the column used 
in the filter is not present in the preceding select statement. But the 
catalyst optimizer is using first() on column y_n and then applying the filter.

Even if the filter was pushed down, the result would have been accurate.

 

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
{code}
{panel:title=Output}
 
 == Parsed Logical Plan ==
 Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- AnalysisBarrier
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Analyzed Logical Plan ==
 x: string, y: string, z: string
 Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Optimized Logical Plan ==
 Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Physical Plan ==
 *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
output=[x#74, y#75, z#76|#74, y#75, z#76])
 +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
 +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
output=[x#74, y#75, z#76|#74, y#75, z#76])
 +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
  
 
---
 
  
 == Parsed Logical Plan ==
 'Filter ('y_n = y)
 +- AnalysisBarrier
 +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Analyzed Logical Plan ==
 x: string, y: string, z: string
 Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Optimized Logical Plan ==
 Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS y_n#77]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Physical Plan ==
 *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
y_n#77|#74, y#75, z#76, y_n#77])
 +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
 +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96])
 +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0
 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
   
{panel}
The second query. ie 
*"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should 
result in error rather than giving wrong output.

  was:
The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)

[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2019-03-20 Thread Rinaz Belhaj (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinaz Belhaj updated SPARK-27213:
-
Description: 
The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)
df.filter("y_n='y'").select('x','y','z').distinct().show()
df.select('x','y','z').distinct().filter("y_n='y'").show()
{code}
 

 
{panel:title=Output}
|x|y|z|y_n|
|a|123|12.3|n|
|a|123|12.3|y|
|a|123|12.4|y|

 
|x|y|z|
|a|123|12.3|
|a|123|12.4|

 
|x|y|z|
|a|123|12.4|
{panel}
 

Ideally, the second statement should result in an error since the column used 
in the filter is not present in the preceding select statement. But the 
catalyst optimizer is using first() on column y_n and then applying the filter.

Even if the filter was pushed down, the result would have been accurate.

 

{{df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])}}

{{df.filter("y_n='y'").select('x','y','z').distinct().explain(True)}}

{{df.select('x','y','z').distinct().filter("y_n='y'").explain(True)}}

 

 
  
{panel:title=Output}
 
 == Parsed Logical Plan ==
 Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- AnalysisBarrier
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Analyzed Logical Plan ==
 x: string, y: string, z: string
 Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Optimized Logical Plan ==
 Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Physical Plan ==
 *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
output=[x#74, y#75, z#76|#74, y#75, z#76])
 +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
 +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
output=[x#74, y#75, z#76|#74, y#75, z#76])
 +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
  
 
---
 
  
 == Parsed Logical Plan ==
 'Filter ('y_n = y)
 +- AnalysisBarrier
 +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Analyzed Logical Plan ==
 x: string, y: string, z: string
 Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Optimized Logical Plan ==
 Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS y_n#77]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Physical Plan ==
 *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
y_n#77|#74, y#75, z#76, y_n#77])
 +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
 +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96])
 +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0
 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
  
{panel}
 
  

The second query. ie 
*"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should 
result in error rather than giving wrong output.

  was:
The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])

[jira] [Assigned] (SPARK-27200) History Environment tab must sort Configurations/Properties by default

2019-03-20 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-27200:


Assignee: Ajith S

> History Environment tab must sort Configurations/Properties by default
> --
>
> Key: SPARK-27200
> URL: https://issues.apache.org/jira/browse/SPARK-27200
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 3.0.0
>Reporter: Ajith S
>Assignee: Ajith S
>Priority: Minor
>
> Environment Page in SparkUI have all the configuration sorted by key. But 
> this is not the case in History server case, to keep UX same, we can have it 
> sorted in history server too



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27200) History Environment tab must sort Configurations/Properties by default

2019-03-20 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27200.
--
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 24143
[https://github.com/apache/spark/pull/24143]

> History Environment tab must sort Configurations/Properties by default
> --
>
> Key: SPARK-27200
> URL: https://issues.apache.org/jira/browse/SPARK-27200
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 3.0.0
>Reporter: Ajith S
>Assignee: Ajith S
>Priority: Minor
> Fix For: 3.0.0
>
>
> Environment Page in SparkUI have all the configuration sorted by key. But 
> this is not the case in History server case, to keep UX same, we can have it 
> sorted in history server too



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27213) Unexpected results due when filter is used after distinct

2019-03-20 Thread Rinaz Belhaj (JIRA)
Rinaz Belhaj created SPARK-27213:


 Summary: Unexpected results due when filter is used after distinct
 Key: SPARK-27213
 URL: https://issues.apache.org/jira/browse/SPARK-27213
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.4.0, 2.3.2
Reporter: Rinaz Belhaj


The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)
df.filter("y_n='y'").select('x','y','z').distinct().show()
df.select('x','y','z').distinct().filter("y_n='y'").show()
{code}
 

 
{panel:title=Output}


 
|x|y|z|y_n|
|a|123|12.3|n|
|a|123|12.3|y|
|a|123|12.4|y|

|x|y|z|
|a|123|12.3|
|a|123|12.4|

|x|y|z|
|a|123|12.4|

 

 
{panel}
 

Ideally, the second statement should result in an error since the column used 
in the filter is not present in the preceding select statement. But the 
catalyst optimizer is using first() on column y_n and then applying the filter.

Even if the filter was pushed down, the result would have been accurate.

 

{{df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])}}

{{df.filter("y_n='y'").select('x','y','z').distinct().explain(True)}}

{{df.select('x','y','z').distinct().filter("y_n='y'").explain(True)}}

 

 
 
{panel:title=Output}
 
== Parsed Logical Plan ==
Deduplicate [x#74, y#75, z#76]
+- AnalysisBarrier
+- Project [x#74, y#75, z#76]
+- Filter (y_n#77 = y)
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Analyzed Logical Plan ==
x: string, y: string, z: string
Deduplicate [x#74, y#75, z#76]
+- Project [x#74, y#75, z#76]
+- Filter (y_n#77 = y)
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Optimized Logical Plan ==
Aggregate [x#74, y#75, z#76], [x#74, y#75, z#76]
+- Project [x#74, y#75, z#76]
+- Filter (isnotnull(y_n#77) && (y_n#77 = y))
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Physical Plan ==
*(2) HashAggregate(keys=[x#74, y#75, z#76], functions=[], output=[x#74, y#75, 
z#76])
+- Exchange hashpartitioning(x#74, y#75, z#76, 10)
+- *(1) HashAggregate(keys=[x#74, y#75, z#76], functions=[], output=[x#74, 
y#75, z#76])
+- *(1) Project [x#74, y#75, z#76]
+- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
+- Scan ExistingRDD[x#74,y#75,z#76,y_n#77]
 
---
 
 
== Parsed Logical Plan ==
'Filter ('y_n = y)
+- AnalysisBarrier
+- Deduplicate [x#74, y#75, z#76]
+- Project [x#74, y#75, z#76]
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Analyzed Logical Plan ==
x: string, y: string, z: string
Project [x#74, y#75, z#76]
+- Filter (y_n#77 = y)
+- Deduplicate [x#74, y#75, z#76]
+- Project [x#74, y#75, z#76, y_n#77]
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Optimized Logical Plan ==
Project [x#74, y#75, z#76]
+- Filter (isnotnull(y_n#77) && (y_n#77 = y))
+- Aggregate [x#74, y#75, z#76], [x#74, y#75, z#76, first(y_n#77, false) AS 
y_n#77]
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Physical Plan ==
*(3) Project [x#74, y#75, z#76]
+- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
+- SortAggregate(key=[x#74, y#75, z#76], functions=[first(y_n#77, false)], 
output=[x#74, y#75, z#76, y_n#77])
+- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST], false, 0
+- Exchange hashpartitioning(x#74, y#75, z#76, 10)
+- SortAggregate(key=[x#74, y#75, z#76], functions=[partial_first(y_n#77, 
false)], output=[x#74, y#75, z#76, first#95, valueSet#96])
+- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST], false, 0
+- Scan ExistingRDD[x#74,y#75,z#76,y_n#77]
 
{panel}
 
 

The second query. ie 
*"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should 
result in error rather than giving wrong output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2019-03-20 Thread Rinaz Belhaj (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinaz Belhaj updated SPARK-27213:
-
Summary: Unexpected results when filter is used after distinct  (was: 
Unexpected results due when filter is used after distinct)

> Unexpected results when filter is used after distinct
> -
>
> Key: SPARK-27213
> URL: https://issues.apache.org/jira/browse/SPARK-27213
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Rinaz Belhaj
>Priority: Minor
>  Labels: distinct, filter
>
> The following code gives unexpected output due to the filter not getting 
> pushed down in catalyst optimizer.
>  
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
>  
>  
> {panel:title=Output}
>  
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
> |x|y|z|
> |a|123|12.4|
>  
>  
> {panel}
>  
> Ideally, the second statement should result in an error since the column used 
> in the filter is not present in the preceding select statement. But the 
> catalyst optimizer is using first() on column y_n and then applying the 
> filter.
> Even if the filter was pushed down, the result would have been accurate.
>  
> {{df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])}}
> {{df.filter("y_n='y'").select('x','y','z').distinct().explain(True)}}
> {{df.select('x','y','z').distinct().filter("y_n='y'").explain(True)}}
>  
>  
>  
> {panel:title=Output}
>  
> == Parsed Logical Plan ==
> Deduplicate [x#74, y#75, z#76]
> +- AnalysisBarrier
> +- Project [x#74, y#75, z#76]
> +- Filter (y_n#77 = y)
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Analyzed Logical Plan ==
> x: string, y: string, z: string
> Deduplicate [x#74, y#75, z#76]
> +- Project [x#74, y#75, z#76]
> +- Filter (y_n#77 = y)
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Optimized Logical Plan ==
> Aggregate [x#74, y#75, z#76], [x#74, y#75, z#76]
> +- Project [x#74, y#75, z#76]
> +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Physical Plan ==
> *(2) HashAggregate(keys=[x#74, y#75, z#76], functions=[], output=[x#74, y#75, 
> z#76])
> +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
> +- *(1) HashAggregate(keys=[x#74, y#75, z#76], functions=[], output=[x#74, 
> y#75, z#76])
> +- *(1) Project [x#74, y#75, z#76]
> +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77]
>  
> ---
>  
>  
> == Parsed Logical Plan ==
> 'Filter ('y_n = y)
> +- AnalysisBarrier
> +- Deduplicate [x#74, y#75, z#76]
> +- Project [x#74, y#75, z#76]
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Analyzed Logical Plan ==
> x: string, y: string, z: string
> Project [x#74, y#75, z#76]
> +- Filter (y_n#77 = y)
> +- Deduplicate [x#74, y#75, z#76]
> +- Project [x#74, y#75, z#76, y_n#77]
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Optimized Logical Plan ==
> Project [x#74, y#75, z#76]
> +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- Aggregate [x#74, y#75, z#76], [x#74, y#75, z#76, first(y_n#77, false) AS 
> y_n#77]
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Physical Plan ==
> *(3) Project [x#74, y#75, z#76]
> +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- SortAggregate(key=[x#74, y#75, z#76], functions=[first(y_n#77, false)], 
> output=[x#74, y#75, z#76, y_n#77])
> +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST], false, 0
> +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
> +- SortAggregate(key=[x#74, y#75, z#76], functions=[partial_first(y_n#77, 
> false)], output=[x#74, y#75, z#76, first#95, valueSet#96])
> +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST], false, 0
> +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77]
>  
> {panel}
>  
>  
> The second query. ie 
> *"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should 
> result in error rather than giving wrong output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27212) Eliminate TimeZone to ZoneId conversion in stringToTimestamp

2019-03-20 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27212:


Assignee: (was: Apache Spark)

> Eliminate TimeZone to ZoneId conversion in stringToTimestamp
> 
>
> Key: SPARK-27212
> URL: https://issues.apache.org/jira/browse/SPARK-27212
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Maxim Gekk
>Priority: Minor
>
> The stringToTimestamp method of DateTimeUtils (and stringToDate as well) can 
> be called per each row. And the method converts TimeZone to ZoneId each time. 
> The operation is relatively expensive because it does intermediate conversion 
> to a string: 
> http://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/f940e7a48b72/src/share/classes/java/util/TimeZone.java#l547
> The conversion is unnecessary, and could be avoid. The ticket aims to replace 
> signature of stringToTimestamp to require ZoneId as a parameter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27212) Eliminate TimeZone to ZoneId conversion in stringToTimestamp

2019-03-20 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27212:


Assignee: Apache Spark

> Eliminate TimeZone to ZoneId conversion in stringToTimestamp
> 
>
> Key: SPARK-27212
> URL: https://issues.apache.org/jira/browse/SPARK-27212
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Maxim Gekk
>Assignee: Apache Spark
>Priority: Minor
>
> The stringToTimestamp method of DateTimeUtils (and stringToDate as well) can 
> be called per each row. And the method converts TimeZone to ZoneId each time. 
> The operation is relatively expensive because it does intermediate conversion 
> to a string: 
> http://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/f940e7a48b72/src/share/classes/java/util/TimeZone.java#l547
> The conversion is unnecessary, and could be avoid. The ticket aims to replace 
> signature of stringToTimestamp to require ZoneId as a parameter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27212) Eliminate TimeZone to ZoneId conversion in stringToTimestamp

2019-03-20 Thread Maxim Gekk (JIRA)
Maxim Gekk created SPARK-27212:
--

 Summary: Eliminate TimeZone to ZoneId conversion in 
stringToTimestamp
 Key: SPARK-27212
 URL: https://issues.apache.org/jira/browse/SPARK-27212
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Maxim Gekk


The stringToTimestamp method of DateTimeUtils (and stringToDate as well) can be 
called per each row. And the method converts TimeZone to ZoneId each time. The 
operation is relatively expensive because it does intermediate conversion to a 
string: 
http://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/f940e7a48b72/src/share/classes/java/util/TimeZone.java#l547

The conversion is unnecessary, and could be avoid. The ticket aims to replace 
signature of stringToTimestamp to require ZoneId as a parameter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27169) number of active tasks is negative on executors page

2019-03-20 Thread shahid (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797040#comment-16797040
 ] 

shahid commented on SPARK-27169:


Hi, It seems, from the above log we can't say that event drop has happened or 
not. Could you please check in the driver log that "Dropping event from queue" 
phrase is there or not?

> number of active tasks is negative on executors page
> 
>
> Key: SPARK-27169
> URL: https://issues.apache.org/jira/browse/SPARK-27169
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.3.2
>Reporter: acupple
>Priority: Minor
> Attachments: QQ20190315-102215.png, QQ20190315-102235.png, 
> image-2019-03-19-15-17-25-522.png, image-2019-03-19-15-21-03-766.png, 
> job_1924.log, stage_3511.log
>
>
> I use spark to process some data in HDFS and HBASE, I use one thread consume 
> message from a queue, and then submit to a thread pool(16 fix size)for spark 
> processor.
> But when run for some time, the active jobs will be thousands, and number of 
> active tasks are negative.
> Actually, these jobs are already done when I check driver logs。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27203) Spark Fails to read a view using CTE (WITH clause) and created via beeline

2019-03-20 Thread Igor Ngouagna (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796958#comment-16796958
 ] 

Igor Ngouagna commented on SPARK-27203:
---

I thought "UNION ALL" was part of the issue here, but no.. if I remove it and 
just select from one table, I still got a compil error:

{code:bash}
Error: Error while compiling statement: FAILED: SemanticException line 1:593 
missing ) at 'NULLS' near 'LAST' in subquery source
line 1:599 missing ) at 'LAST' near 'LAST' in subquery source
line 1:604 missing ) at 'ROWS' near 'LAST' in subquery source in definition of 
VIEW test_cte_view [
SELECT `gen_attr_0` AS `id`, `gen_attr_1` AS `status`, `gen_attr_2` AS `idate` 
FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM (SELECT 
`gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM (SELECT `id` AS `gen_attr_0`, 
`status` AS `gen_attr_1`, `idate` AS `gen_attr_2` FROM `db`.`test_cte`) AS 
gen_subquery_0) AS cte INNER JOIN (SELECT `gen_attr_3`, `gen_attr_4`, 
`gen_attr_5` FROM (SELECT `gen_attr_3`, `gen_attr_4`, `gen_attr_5` FROM (SELECT 
gen_subquery_2.`gen_attr_3`, gen_subquery_2.`gen_attr_4`, row_number() OVER 
(PARTITION BY `gen_attr_3` ORDER BY `gen_attr_4` DESC NULLS LAST ROWS BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_5` FROM (SELECT `gen_attr_3`, 
`gen_attr_4` FROM (SELECT `gen_attr_3`, `gen_attr_6`, `gen_attr_4` FROM (SELECT 
`id` AS `gen_attr_3`, `status` AS `gen_attr_6`, `idate` AS `gen_attr_4` FROM 
`db`.`test_cte`) AS gen_subquery_1) AS cte) AS gen_subquery_2) AS 
gen_subquery_3) AS tmp WHERE (`gen_attr_5` = 1)) AS tmp_2 ON ((`gen_attr_0` = 
`gen_attr_3`) AND (`gen_attr_2` = `gen_attr_4`))) AS cte
] used as test_cte_view at Line 1:14 (state=42000,code=4)
{code}

> Spark Fails to read a view using CTE (WITH clause) and created via beeline 
> ---
>
> Key: SPARK-27203
> URL: https://issues.apache.org/jira/browse/SPARK-27203
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Igor Ngouagna
>Priority: Major
>
> Spark fails when trying to read a view which code involve CTE, and which is 
> created via beeline.
> For example, considering the following view, created via Beeline:
> {code:sql}
> create view db.test as 
> with q1 as (select 1 as n)
> select n from q1
> {code}
> When you do
> {code:java}
> spark.sql("select * from db.test").show()
> {code}
> The output is like
> {code}
> 'Table or view not found: q1; line 2 pos 14'
> Traceback (most recent call last):
>   File 
> "/DATA/fs11/hadoop/yarn/local/usercache/ingouagn/appcache/application_1552973526615_3878/container_e380_1552973526615_3878_01_01/pyspark.zip/pyspark/sql/session.py",
>  line 545, in sql
> return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
>   File 
> "/DATA/fs11/hadoop/yarn/local/usercache/ingouagn/appcache/application_1552973526615_3878/container_e380_1552973526615_3878_01_01/py4j-0.10.4-src.zip/py4j/java_gateway.py",
>  line 1133, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File 
> "/DATA/fs11/hadoop/yarn/local/usercache/ingouagn/appcache/application_1552973526615_3878/container_e380_1552973526615_3878_01_01/pyspark.zip/pyspark/sql/utils.py",
>  line 69, in deco
> raise AnalysisException(s.split(': ', 1)[1], stackTrace)
> pyspark.sql.utils.AnalysisException: 'Table or view not found: q1; line 2 pos 
> 14'
> {code}
>  
> *Spark: 2.1.1*
> *Beeline: 1.2.1000*
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27203) Spark Fails to read a view using CTE (WITH clause) and created via beeline

2019-03-20 Thread Igor Ngouagna (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796946#comment-16796946
 ] 

Igor Ngouagna commented on SPARK-27203:
---

Furthermore, I notice something weird when i tried to create the view from 
spark.

If the view code is basic (like the one above), everything works well. That is 
the view is readable from Spark, and from Beeline.

However, if the view code is a little more complex, and the view is *created 
via spark.sql*, reading it from spark.sql is OK, but *reading it from beeline 
fails*!

For example considering the following view created via Spark SQL:

View Creation
{code:sql}
spark.sql("CREATE VIEW IF NOT EXISTS db.test_cte_view AS\
  with cte as (select * from db.test_cte union all select * from 
db.test_cte_2),\
   tmp as (SELECT id, idate, ROW_NUMBER() over(PARTITION BY id ORDER BY 
idate desc ) AS row_num from cte)\
  SELECT cte.* from cte\
join (SELECT * from tmp where tmp.row_num =1) tmp_2\
on cte.id = tmp_2.id\
and cte.idate = tmp_2.idate")
{code}
When you do
{code:sql}
beeline> select * from db.test_cte_view;
{code}
the output is like
{code}
Error: Error while compiling statement: FAILED: SemanticException line 1:330 
Failed to recognize predicate 'UNION'. Failed rule: 'identifier' in subquery 
source in definition of VIEW test_cte_view [
SELECT `gen_attr_0` AS `id`, `gen_attr_1` AS `status`, `gen_attr_2` AS `idate` 
FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM ((SELECT 
`gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM (SELECT `id` AS `gen_attr_0`, 
`status` AS `gen_attr_1`, `idate` AS `gen_attr_2` FROM `db`.`test_cte`) AS 
gen_subquery_0) UNION ALL (SELECT `gen_attr_5`, `gen_attr_6`, `gen_attr_7` FROM 
(SELECT `id` AS `gen_attr_5`, `status` AS `gen_attr_6`, `idate` AS `gen_attr_7` 
FROM `db`.`test_cte_2`) AS gen_subquery_1)) AS cte INNER JOIN (SELECT 
`gen_attr_3`, `gen_attr_4`, `gen_attr_8` FROM (SELECT `gen_attr_3`, 
`gen_attr_4`, `gen_attr_8` FROM (SELECT gen_subquery_4.`gen_attr_3`, 
gen_subquery_4.`gen_attr_4`, row_number() OVER (PARTITION BY `gen_attr_3` ORDER 
BY `gen_attr_4` DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW) AS `gen_attr_8` FROM (SELECT `gen_attr_3`, `gen_attr_4` FROM ((SELECT 
`gen_attr_3`, `gen_attr_9`, `gen_attr_4` FROM (SELECT `id` AS `gen_attr_3`, 
`status` AS `gen_attr_9`, `idate` AS `gen_attr_4` FROM `db`.`test_cte`) AS 
gen_subquery_2) UNION ALL (SELECT `gen_attr_5`, `gen_attr_6`, `gen_attr_7` FROM 
(SELECT `id` AS `gen_attr_5`, `status` AS `gen_attr_6`, `idate` AS `gen_attr_7` 
FROM `db`.`test_cte_2`) AS gen_subquery_3)) AS cte) AS gen_subquery_4) AS 
gen_subquery_5) AS tmp WHERE (`gen_attr_8` = 1)) AS tmp_2 ON ((`gen_attr_0` = 
`gen_attr_3`) AND (`gen_attr_2` = `gen_attr_4`))) AS cte
] used as test_cte_view at Line 1:14 (state=42000,code=4)
{code}
*Tables for test*:
{code:sql}
CREATE TABLE db.test_cte(
id int, 
status string, 
idate date )
  
CREATE TABLE db.test_cte_2(
id int, 
status string, 
idate date )
{code}

> Spark Fails to read a view using CTE (WITH clause) and created via beeline 
> ---
>
> Key: SPARK-27203
> URL: https://issues.apache.org/jira/browse/SPARK-27203
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Igor Ngouagna
>Priority: Major
>
> Spark fails when trying to read a view which code involve CTE, and which is 
> created via beeline.
> For example, considering the following view, created via Beeline:
> {code:sql}
> create view db.test as 
> with q1 as (select 1 as n)
> select n from q1
> {code}
> When you do
> {code:java}
> spark.sql("select * from db.test").show()
> {code}
> The output is like
> {code}
> 'Table or view not found: q1; line 2 pos 14'
> Traceback (most recent call last):
>   File 
> "/DATA/fs11/hadoop/yarn/local/usercache/ingouagn/appcache/application_1552973526615_3878/container_e380_1552973526615_3878_01_01/pyspark.zip/pyspark/sql/session.py",
>  line 545, in sql
> return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
>   File 
> "/DATA/fs11/hadoop/yarn/local/usercache/ingouagn/appcache/application_1552973526615_3878/container_e380_1552973526615_3878_01_01/py4j-0.10.4-src.zip/py4j/java_gateway.py",
>  line 1133, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File 
> "/DATA/fs11/hadoop/yarn/local/usercache/ingouagn/appcache/application_1552973526615_3878/container_e380_1552973526615_3878_01_01/pyspark.zip/pyspark/sql/utils.py",
>  line 69, in deco
> raise AnalysisException(s.split(': ', 1)[1], stackTrace)
> pyspark.sql.utils.AnalysisException: 'Table or view not found: q1; line 2 pos 
> 14'
> {code}
>  
> *Spark: 2.1.1*
> *Beeline: 1.2.1000*
>  



--
This message was sent 

[jira] [Commented] (SPARK-26606) parameters passed in extraJavaOptions are not being picked up

2019-03-20 Thread Mateusz Kaczor (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796924#comment-16796924
 ] 

Mateusz Kaczor commented on SPARK-26606:


I think I've come across similar issue (at least with the driver, works fine 
for executors in my case)

I've even posted a question on stackoverflow: 
[https://stackoverflow.com/questions/55244273/spark-2-4-0-submit-in-cluster-mode-why-is-rest-submission-server-required]

 

To sum up the problem:

Spark version 2.4.0, *standalone* cluster.

I'm submitting app using spark-submit, in all cases exactly the same script is 
used, just changing master port and deploy mode.

I want to pass some extraJavaOptions to driver hence I'm using 
spark.driver.extraJavaOptions property (-- conf 
"spark.driver.extraJavaOptions=-Dfoo=BAR")

I assume that variable was properly passed if it's listed in System Properties 
table in Environment tab of app UI (the one running on port 4040).

Here is what I've observed:

 
||Deploy mode||Deploy to 7077 (regular way)||Deploy to 6066 (via REST)||
|Client|Variables are passed correctly|N/A|
|Cluster|*{color:#ff}Variables are not passed{color}*|Variables are passed 
correctly|

 

All in all, it looks to me that if we want to pass system variables in cluster 
mode *we have to* deploy via REST.

I consider it a bug, please correct me if I'm wrong. 

 

> parameters passed in extraJavaOptions are not being picked up 
> --
>
> Key: SPARK-26606
> URL: https://issues.apache.org/jira/browse/SPARK-26606
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Submit
>Affects Versions: 2.3.1
>Reporter: Ravindra
>Priority: Major
>  Labels: java, spark
>
> driver.extraJavaOptions and executor.extraJavaOptions are not being picked up 
> . Even though I see the parameters are being passed fine in the spark launch 
> command I do not see these parameters are being picked up for some unknown 
> reason. My source code throws an error stating the java params are empty
>  
> This is my spark submit command: 
>     output=`spark-submit \
>  --class com.demo.myApp.App \
>  --conf 'spark.executor.extraJavaOptions=-Dapp.env=dev -Dapp.country=US 
> -Dapp.banner=ABC -Doracle.net.tns_admin=/work/artifacts/oracle/current 
> -Djava.security.egd=[file:/dev/./urandom|file:///dev/urandom]' \
>  --conf 'spark.driver.extraJavaOptions=-Dapp.env=dev -Dapp.country=US 
> -Dapp.banner=ABC -Doracle.net.tns_admin=/work/artifacts/oracle/current 
> -Djava.security.egd=[file:/dev/./urandom|file:///dev/urandom]' \
>  --executor-memory "$EXECUTOR_MEMORY" \
>  --executor-cores "$EXECUTOR_CORES" \
>  --total-executor-cores "$TOTAL_CORES" \
>  --driver-memory "$DRIVER_MEMORY" \
>  --deploy-mode cluster \
>  /home/spark/asm//current/myapp-*.jar 2>&1 &`
>  
>  
> Is there any other way I can access the java params with out using 
> extraJavaOptions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27099) Expose xxHash64 as a flexible 64-bit column hash like `hash`

2019-03-20 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-27099.
-
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 24019
[https://github.com/apache/spark/pull/24019]

> Expose xxHash64 as a flexible 64-bit column hash like `hash`
> 
>
> Key: SPARK-27099
> URL: https://issues.apache.org/jira/browse/SPARK-27099
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Huon Wilson
>Assignee: Huon Wilson
>Priority: Major
> Fix For: 3.0.0
>
>
> I’m working on something that requires deterministic randomness, i.e. a row 
> gets the same “random” value no matter the order of the DataFrame. A seeded 
> hash seems to be the perfect way to do this, but the existing hashes have 
> various limitations:
> - hash: 32-bit output (only 4 billion possibilities will result in a lot of 
> collisions for many tables: the birthday paradox implies  >50% chance of at 
> least one for tables larger than 77000 rows, and likely ~1.6 billion 
> collisions in a table of size 4 billion)
> - sha1/sha2/md5: single binary column input, string output
> It seems there’s already support for a 64-bit hash function that can work 
> with an arbitrary number of arbitrary-typed columns (XxHash64), which could 
> be exposed as xxHash64 or xxhash64 (or similar).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27099) Expose xxHash64 as a flexible 64-bit column hash like `hash`

2019-03-20 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-27099:
---

Assignee: Huon Wilson

> Expose xxHash64 as a flexible 64-bit column hash like `hash`
> 
>
> Key: SPARK-27099
> URL: https://issues.apache.org/jira/browse/SPARK-27099
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Huon Wilson
>Assignee: Huon Wilson
>Priority: Major
>
> I’m working on something that requires deterministic randomness, i.e. a row 
> gets the same “random” value no matter the order of the DataFrame. A seeded 
> hash seems to be the perfect way to do this, but the existing hashes have 
> various limitations:
> - hash: 32-bit output (only 4 billion possibilities will result in a lot of 
> collisions for many tables: the birthday paradox implies  >50% chance of at 
> least one for tables larger than 77000 rows, and likely ~1.6 billion 
> collisions in a table of size 4 billion)
> - sha1/sha2/md5: single binary column input, string output
> It seems there’s already support for a 64-bit hash function that can work 
> with an arbitrary number of arbitrary-typed columns (XxHash64), which could 
> be exposed as xxHash64 or xxhash64 (or similar).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27211) cast error when select column from Row

2019-03-20 Thread Guiju Zhang (JIRA)
Guiju Zhang created SPARK-27211:
---

 Summary: cast error when select column from Row
 Key: SPARK-27211
 URL: https://issues.apache.org/jira/browse/SPARK-27211
 Project: Spark
  Issue Type: Question
  Components: Java API
Affects Versions: 2.3.1, 2.3.0
Reporter: Guiju Zhang


(1) RawLogPlayload has an field: long timestamp

 

(2)

extractedRawTc.printSchema();   // output1

Dataset extractedRawW3cFilled = 
extractedRawW3c.alias("extractedRawW3c")

.join(extractedRawTc.alias("extractedRawTc"), 
functions.col("extractedRawW3c.rawsessionid").equalTo(functions.col("extractedRawTc.rawsessionid")),
 "inner")

.select(functions.col("extractedRawW3c.df_logdatetime"), 
functions.col("extractedRawW3c.rawsessionid"), 
functions.col("extractedRawTc.uid"),

functions.col("extractedRawW3c.time"),functions.col("extractedRawW3c.T"),functions.col("extractedRawW3c.url"),functions.col("extractedRawW3c.wid"),

functions.col("extractedRawW3c.tid"), 
functions.col("extractedRawW3c.fid"),functions.col("extractedRawW3c.string1"),

functions.col("extractedRawW3c.curWindow"), 
*functions.col("extractedRawW3c.timestamp")*)

.as(Encoders.bean(RawLogPayload.class));

extractedRawW3cFilled.printSchema();  // output2

 

(4) cast exception

 

2019-03-20 15:28:31 ERROR CodeGenerator:91 ## failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
103, Column 32: No applicable constructor/method found for actual parameters 
"org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void 
com.microsoft.datamining.spartan.api.core.RawLogPayload.setTimestamp(long)"

org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
103, Column 32: *No applicable constructor/method found for actual parameters 
"org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void 
com.****.****.spartan.api.core.RawLogPayload.setTimestamp(long)"*

at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821)

at 
org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8910)

 

 

Output1 extractedRawTc schema

root

 |-- curWindow: string (nullable = true)

 |-- df_logdatetime: string (nullable = true)

 |-- fid: string (nullable = true)

 |-- rawsessionid: string (nullable = true)

 |-- string1: string (nullable = true)

 |-- t: string (nullable = true)

 |-- tid: string (nullable = true)

 |-- time: string (nullable = true)

 |-- *timestamp: long (nullable = true)*

 |-- uid: string (nullable = true)

 |-- url: string (nullable = true)

 |-- wid: string (nullable = true)

 

 

Output2  extractedRawW3cFilled schema

root

 |-- df_logdatetime: string (nullable = true)

 |-- rawsessionid: string (nullable = true)

 |-- uid: string (nullable = true)

 |-- time: string (nullable = true)

 |-- T: string (nullable = true)

 |-- url: string (nullable = true)

 |-- wid: string (nullable = true)

 |-- tid: string (nullable = true)

 |-- fid: string (nullable = true)

 |-- string1: string (nullable = true)

 |-- curWindow: string (nullable = true)

 |-- *timestamp: long (nullable = true)*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27211) cast error when select column from Row

2019-03-20 Thread Guiju Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guiju Zhang updated SPARK-27211:

Description: 
1.First, I have an object RawLogPlayload which has an field: long timestamp

2.Then I try to join two Dataset and select some of the columns

Following is the code Snippet

extractedRawTc.printSchema();   // output1

Dataset extractedRawW3cFilled = 
extractedRawW3c.alias("extractedRawW3c")

.join(extractedRawTc.alias("extractedRawTc"), 
functions.col("extractedRawW3c.rawsessionid").equalTo(functions.col("extractedRawTc.rawsessionid")),
 "inner")

.select(functions.col("extractedRawW3c.df_logdatetime"), 
functions.col("extractedRawW3c.rawsessionid"), 
functions.col("extractedRawTc.uid"),

functions.col("extractedRawW3c.time"),functions.col("extractedRawW3c.T"),functions.col("extractedRawW3c.url"),functions.col("extractedRawW3c.wid"),

functions.col("extractedRawW3c.tid"), 
functions.col("extractedRawW3c.fid"),functions.col("extractedRawW3c.string1"),

functions.col("extractedRawW3c.curWindow"), 
*functions.col("extractedRawW3c.timestamp")*)

.as(Encoders.bean(RawLogPayload.class));

extractedRawW3cFilled.printSchema();  // output2

 

3. After run this, it will cast following exception

2019-03-20 15:28:31 ERROR CodeGenerator:91 ## failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
103, Column 32: No applicable constructor/method found for actual parameters 
"org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void 
com.microsoft.datamining.spartan.api.core.RawLogPayload.setTimestamp(long)"

org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
103, Column 32: *No applicable constructor/method found for actual parameters 
"org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void 
com.****.****.spartan.api.core.RawLogPayload.setTimestamp(long)"*

at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821)

at 
org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8910)

 

Output1 extractedRawTc schema

root

 |-- curWindow: string (nullable = true)

 |-- df_logdatetime: string (nullable = true)

 |-- fid: string (nullable = true)

 |-- rawsessionid: string (nullable = true)

 |-- string1: string (nullable = true)

 |-- t: string (nullable = true)

 |-- tid: string (nullable = true)

 |-- time: string (nullable = true)

 |-- *timestamp: long (nullable = true)*

 |-- uid: string (nullable = true)

 |-- url: string (nullable = true)

 |-- wid: string (nullable = true)

 

Output2  extractedRawW3cFilled schema

root

 |-- df_logdatetime: string (nullable = true)

 |-- rawsessionid: string (nullable = true)

 |-- uid: string (nullable = true)

 |-- time: string (nullable = true)

 |-- T: string (nullable = true)

 |-- url: string (nullable = true)

 |-- wid: string (nullable = true)

 |-- tid: string (nullable = true)

 |-- fid: string (nullable = true)

 |-- string1: string (nullable = true)

 |-- curWindow: string (nullable = true)

 |-- *timestamp: long (nullable = true)*

 

My question: the schema of column timestamp is long, but from the exception 
log, it seems after selecting the datatype of timestamp becomes UTF8String, Why 
would this happen? Is it a bug? If not could you point how to use it correctly?

Thanks

  was:
First, I have an object RawLogPlayload which has an field: long timestamp

Then I try to join two Dataset and select some of the columns

Following is the code Snippet

extractedRawTc.printSchema();   // output1

Dataset extractedRawW3cFilled = 
extractedRawW3c.alias("extractedRawW3c")

.join(extractedRawTc.alias("extractedRawTc"), 
functions.col("extractedRawW3c.rawsessionid").equalTo(functions.col("extractedRawTc.rawsessionid")),
 "inner")

.select(functions.col("extractedRawW3c.df_logdatetime"), 
functions.col("extractedRawW3c.rawsessionid"), 
functions.col("extractedRawTc.uid"),

functions.col("extractedRawW3c.time"),functions.col("extractedRawW3c.T"),functions.col("extractedRawW3c.url"),functions.col("extractedRawW3c.wid"),

functions.col("extractedRawW3c.tid"), 
functions.col("extractedRawW3c.fid"),functions.col("extractedRawW3c.string1"),

functions.col("extractedRawW3c.curWindow"), 
*functions.col("extractedRawW3c.timestamp")*)

.as(Encoders.bean(RawLogPayload.class));

extractedRawW3cFilled.printSchema();  // output2

 

After run this, it will cast following exception

2019-03-20 15:28:31 ERROR CodeGenerator:91 ## failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
103, Column 32: No applicable constructor/method found for actual parameters 
"org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void 
com.microsoft.datamining.spartan.api.core.RawLogPayload.setTimestamp(long)"

org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
103, Column 32: 

[jira] [Updated] (SPARK-27211) cast error when select column from Row

2019-03-20 Thread Guiju Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guiju Zhang updated SPARK-27211:

Description: 
First, I have an object RawLogPlayload which has an field: long timestamp

Then I try to join two Dataset and select some of the columns

Following is the code Snippet

extractedRawTc.printSchema();   // output1

Dataset extractedRawW3cFilled = 
extractedRawW3c.alias("extractedRawW3c")

.join(extractedRawTc.alias("extractedRawTc"), 
functions.col("extractedRawW3c.rawsessionid").equalTo(functions.col("extractedRawTc.rawsessionid")),
 "inner")

.select(functions.col("extractedRawW3c.df_logdatetime"), 
functions.col("extractedRawW3c.rawsessionid"), 
functions.col("extractedRawTc.uid"),

functions.col("extractedRawW3c.time"),functions.col("extractedRawW3c.T"),functions.col("extractedRawW3c.url"),functions.col("extractedRawW3c.wid"),

functions.col("extractedRawW3c.tid"), 
functions.col("extractedRawW3c.fid"),functions.col("extractedRawW3c.string1"),

functions.col("extractedRawW3c.curWindow"), 
*functions.col("extractedRawW3c.timestamp")*)

.as(Encoders.bean(RawLogPayload.class));

extractedRawW3cFilled.printSchema();  // output2

 

After run this, it will cast following exception

2019-03-20 15:28:31 ERROR CodeGenerator:91 ## failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
103, Column 32: No applicable constructor/method found for actual parameters 
"org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void 
com.microsoft.datamining.spartan.api.core.RawLogPayload.setTimestamp(long)"

org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
103, Column 32: *No applicable constructor/method found for actual parameters 
"org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void 
com.****.****.spartan.api.core.RawLogPayload.setTimestamp(long)"*

at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821)

at 
org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8910)

 

Output1 extractedRawTc schema

root

 |-- curWindow: string (nullable = true)

 |-- df_logdatetime: string (nullable = true)

 |-- fid: string (nullable = true)

 |-- rawsessionid: string (nullable = true)

 |-- string1: string (nullable = true)

 |-- t: string (nullable = true)

 |-- tid: string (nullable = true)

 |-- time: string (nullable = true)

 |-- *timestamp: long (nullable = true)*

 |-- uid: string (nullable = true)

 |-- url: string (nullable = true)

 |-- wid: string (nullable = true)

 

Output2  extractedRawW3cFilled schema

root

 |-- df_logdatetime: string (nullable = true)

 |-- rawsessionid: string (nullable = true)

 |-- uid: string (nullable = true)

 |-- time: string (nullable = true)

 |-- T: string (nullable = true)

 |-- url: string (nullable = true)

 |-- wid: string (nullable = true)

 |-- tid: string (nullable = true)

 |-- fid: string (nullable = true)

 

My question: the schema of column timestamp is long, but from the exception 
log, it seems after selecting the datatype of timestamp becomes UTF8String, Why 
would this happen? Is it a bug? If not could you point how to use it correctly?

Thanks

 

 

 |-- string1: string (nullable = true)

 |-- curWindow: string (nullable = true)

 |-- *timestamp: long (nullable = true)*

  was:
(1) RawLogPlayload has an field: long timestamp

 

(2)

extractedRawTc.printSchema();   // output1

Dataset extractedRawW3cFilled = 
extractedRawW3c.alias("extractedRawW3c")

.join(extractedRawTc.alias("extractedRawTc"), 
functions.col("extractedRawW3c.rawsessionid").equalTo(functions.col("extractedRawTc.rawsessionid")),
 "inner")

.select(functions.col("extractedRawW3c.df_logdatetime"), 
functions.col("extractedRawW3c.rawsessionid"), 
functions.col("extractedRawTc.uid"),

functions.col("extractedRawW3c.time"),functions.col("extractedRawW3c.T"),functions.col("extractedRawW3c.url"),functions.col("extractedRawW3c.wid"),

functions.col("extractedRawW3c.tid"), 
functions.col("extractedRawW3c.fid"),functions.col("extractedRawW3c.string1"),

functions.col("extractedRawW3c.curWindow"), 
*functions.col("extractedRawW3c.timestamp")*)

.as(Encoders.bean(RawLogPayload.class));

extractedRawW3cFilled.printSchema();  // output2

 

(4) cast exception

 

2019-03-20 15:28:31 ERROR CodeGenerator:91 ## failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
103, Column 32: No applicable constructor/method found for actual parameters 
"org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void 
com.microsoft.datamining.spartan.api.core.RawLogPayload.setTimestamp(long)"

org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
103, Column 32: *No applicable constructor/method found for actual parameters 
"org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void 

[jira] [Assigned] (SPARK-27210) Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted

2019-03-20 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27210:


Assignee: Apache Spark

> Cleanup incomplete output files in ManifestFileCommitProtocol if task is 
> aborted
> 
>
> Key: SPARK-27210
> URL: https://issues.apache.org/jira/browse/SPARK-27210
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Jungtaek Lim
>Assignee: Apache Spark
>Priority: Minor
>
> Unlike HadoopMapReduceCommitProtocol, ManifestFileCommitProtocol doesn't 
> clean up incomplete output files for both cases: task is aborted as well as 
> job is aborted.
> In HadoopMapReduceCommitProtocol, it leverages stage directory to write 
> intermediate files so once job is aborted it can simply delete stage 
> directory to clean up everything. Even HadoopMapReduceCommitProtocol puts 
> more effort on cleaning up intermediate files on task side if task is aborted.
> ManifestFileCommitProtocol doesn't do anything for cleaning up but just 
> maintains the metadata which list of complete output files are written. It 
> should be better if ManifestFileCommitProtocol can do the best effort to 
> clean up: not sure it can do job level cleanup since it doesn't leverage 
> stage directory, but it's clear that it can still put best effort to do task 
> level cleanup.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27210) Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted

2019-03-20 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27210:


Assignee: (was: Apache Spark)

> Cleanup incomplete output files in ManifestFileCommitProtocol if task is 
> aborted
> 
>
> Key: SPARK-27210
> URL: https://issues.apache.org/jira/browse/SPARK-27210
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Jungtaek Lim
>Priority: Minor
>
> Unlike HadoopMapReduceCommitProtocol, ManifestFileCommitProtocol doesn't 
> clean up incomplete output files for both cases: task is aborted as well as 
> job is aborted.
> In HadoopMapReduceCommitProtocol, it leverages stage directory to write 
> intermediate files so once job is aborted it can simply delete stage 
> directory to clean up everything. Even HadoopMapReduceCommitProtocol puts 
> more effort on cleaning up intermediate files on task side if task is aborted.
> ManifestFileCommitProtocol doesn't do anything for cleaning up but just 
> maintains the metadata which list of complete output files are written. It 
> should be better if ManifestFileCommitProtocol can do the best effort to 
> clean up: not sure it can do job level cleanup since it doesn't leverage 
> stage directory, but it's clear that it can still put best effort to do task 
> level cleanup.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27208) RestSubmissionClient only supports http

2019-03-20 Thread Jorge Machado (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796871#comment-16796871
 ] 

Jorge Machado commented on SPARK-27208:
---

Furthermore I'm getting the next error: 
./bin/spark-submit  --class org.apache.spark.examples.SparkPi   --master 
[mesos://host:5050/api] --deploy-mode cluster   --conf 
spark.master.rest.enabled=true  --total-executor-cores 4 --jars 
/home/machjor/spark-2.4.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.0.jar
  
/home/machjor/spark-2.4.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.0.jar
 10
2019-03-18 20:39:31 WARN  NativeCodeLoader:62 - Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
2019-03-18 20:39:31 INFO  RestSubmissionClient:54 - Submitting a request to 
launch an application in [mesos://host:5050/api].
2019-03-18 20:39:32 ERROR RestSubmissionClient:70 - Server responded with error:
Some(Failed to validate master::Call: Expecting 'type' to be present)
2019-03-18 20:39:32 ERROR RestSubmissionClient:70 - Error: Server responded 
with message of unexpected type ErrorResponse.
2019-03-18 20:39:32 INFO  ShutdownHookManager:54 - Shutdown hook called
2019-03-18 20:39:32 INFO  ShutdownHookManager:54 - Deleting directory 
/tmp/spark-259c0e66-c2ab-43b4-90df-

> RestSubmissionClient only supports http
> ---
>
> Key: SPARK-27208
> URL: https://issues.apache.org/jira/browse/SPARK-27208
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.4.0
>Reporter: Jorge Machado
>Priority: Minor
>
> As stand of now the class RestSubmissionClient does not support https, which 
> fails for example if we run mesos master with ssl and in cluster mode. 
> The spark-submit command fails with: Mesos cluster mode is only supported 
> through the REST submission API
>  
> I create a PR for this which checks if the master endpoint given can speak 
> ssl before submitting the command. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27208) RestSubmissionClient only supports http

2019-03-20 Thread Jorge Machado (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796860#comment-16796860
 ] 

Jorge Machado commented on SPARK-27208:
---

I would like to take this.

> RestSubmissionClient only supports http
> ---
>
> Key: SPARK-27208
> URL: https://issues.apache.org/jira/browse/SPARK-27208
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.4.0
>Reporter: Jorge Machado
>Priority: Minor
>
> As stand of now the class RestSubmissionClient does not support https, which 
> fails for example if we run mesos master with ssl and in cluster mode. 
> The spark-submit command fails with: Mesos cluster mode is only supported 
> through the REST submission API
>  
> I create a PR for this which checks if the master endpoint given can speak 
> ssl before submitting the command. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27208) RestSubmissionClient only supports http

2019-03-20 Thread Jorge Machado (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Machado updated SPARK-27208:
--
Shepherd: Sean Owen

> RestSubmissionClient only supports http
> ---
>
> Key: SPARK-27208
> URL: https://issues.apache.org/jira/browse/SPARK-27208
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.4.0
>Reporter: Jorge Machado
>Priority: Minor
>
> As stand of now the class RestSubmissionClient does not support https, which 
> fails for example if we run mesos master with ssl and in cluster mode. 
> The spark-submit command fails with: Mesos cluster mode is only supported 
> through the REST submission API
>  
> I create a PR for this which checks if the master endpoint given can speak 
> ssl before submitting the command. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27210) Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted

2019-03-20 Thread Jungtaek Lim (JIRA)
Jungtaek Lim created SPARK-27210:


 Summary: Cleanup incomplete output files in 
ManifestFileCommitProtocol if task is aborted
 Key: SPARK-27210
 URL: https://issues.apache.org/jira/browse/SPARK-27210
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 3.0.0
Reporter: Jungtaek Lim


Unlike HadoopMapReduceCommitProtocol, ManifestFileCommitProtocol doesn't clean 
up incomplete output files for both cases: task is aborted as well as job is 
aborted.

In HadoopMapReduceCommitProtocol, it leverages stage directory to write 
intermediate files so once job is aborted it can simply delete stage directory 
to clean up everything. Even HadoopMapReduceCommitProtocol puts more effort on 
cleaning up intermediate files on task side if task is aborted.

ManifestFileCommitProtocol doesn't do anything for cleaning up but just 
maintains the metadata which list of complete output files are written. It 
should be better if ManifestFileCommitProtocol can do the best effort to clean 
up: not sure it can do job level cleanup since it doesn't leverage stage 
directory, but it's clear that it can still put best effort to do task level 
cleanup.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27160) Incorrect Literal Casting of DecimalType in OrcFilters

2019-03-20 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-27160:
--
Fix Version/s: (was: 3.0.0)

> Incorrect Literal Casting of DecimalType in OrcFilters
> --
>
> Key: SPARK-27160
> URL: https://issues.apache.org/jira/browse/SPARK-27160
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Darcy Shen
>Priority: Blocker
>  Labels: correctness
>
> DecimalType Literal should not be casted to Long.
> eg. For `df.filter("x < 3.14")`, assuming df (x in DecimalType) reads from a 
> ORC table and uses the native ORC reader with predicate push down enabled, we 
> will push down the `x < 3.14` predicate to the ORC reader via a 
> SearchArgument.
> OrcFilters will construct the SearchArgument, but not handle the DecimalType 
> correctly.
> The previous impl will construct `x < 3` from `x < 3.14`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27160) Incorrect Literal Casting of DecimalType in OrcFilters

2019-03-20 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-27160:
--
Priority: Major  (was: Blocker)

> Incorrect Literal Casting of DecimalType in OrcFilters
> --
>
> Key: SPARK-27160
> URL: https://issues.apache.org/jira/browse/SPARK-27160
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Darcy Shen
>Priority: Major
>  Labels: correctness
>
> DecimalType Literal should not be casted to Long.
> eg. For `df.filter("x < 3.14")`, assuming df (x in DecimalType) reads from a 
> ORC table and uses the native ORC reader with predicate push down enabled, we 
> will push down the `x < 3.14` predicate to the ORC reader via a 
> SearchArgument.
> OrcFilters will construct the SearchArgument, but not handle the DecimalType 
> correctly.
> The previous impl will construct `x < 3` from `x < 3.14`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >