[jira] [Updated] (SPARK-23207) Shuffle+Repartition on an DataFrame could lead to incorrect answers

2018-08-23 Thread Jiang Xingbo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-23207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xingbo updated SPARK-23207:
-
Affects Version/s: 1.6.0
   2.0.0
   2.1.0
   2.2.0

> Shuffle+Repartition on an DataFrame could lead to incorrect answers
> ---
>
> Key: SPARK-23207
> URL: https://issues.apache.org/jira/browse/SPARK-23207
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0
>Reporter: Jiang Xingbo
>Assignee: Jiang Xingbo
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.3.0
>
>
> Currently shuffle repartition uses RoundRobinPartitioning, the generated 
> result is nondeterministic since the sequence of input rows are not 
> determined.
> The bug can be triggered when there is a repartition call following a shuffle 
> (which would lead to non-deterministic row ordering), as the pattern shows 
> below:
> upstream stage -> repartition stage -> result stage
> (-> indicate a shuffle)
> When one of the executors process goes down, some tasks on the repartition 
> stage will be retried and generate inconsistent ordering, and some tasks of 
> the result stage will be retried generating different data.
> The following code returns 931532, instead of 100:
> {code}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
>   x
> }.repartition(200).map { x =>
>   if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
> throw new Exception("pkill -f java".!!)
>   }
>   x
> }
> res.distinct().count()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25114) RecordBinaryComparator may return wrong result when subtraction between two words is divisible by Integer.MAX_VALUE

2018-08-21 Thread Jiang Xingbo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587020#comment-16587020
 ] 

Jiang Xingbo commented on SPARK-25114:
--

The PR has been merged to master and 2.3

> RecordBinaryComparator may return wrong result when subtraction between two 
> words is divisible by Integer.MAX_VALUE
> ---
>
> Key: SPARK-25114
> URL: https://issues.apache.org/jira/browse/SPARK-25114
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Jiang Xingbo
>Priority: Blocker
>  Labels: correctness
>
> It is possible for two objects to be unequal and yet we consider them as 
> equal within RecordBinaryComparator, if the long values are separated by 
> Int.MaxValue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25161) Fix several bugs in failure handling of barrier execution mode

2018-08-20 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-25161:


 Summary: Fix several bugs in failure handling of barrier execution 
mode
 Key: SPARK-25161
 URL: https://issues.apache.org/jira/browse/SPARK-25161
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Jiang Xingbo


Fix several bugs in failure handling of barrier execution mode:
* Mark TaskSet for a barrier stage as zombie when a task attempt fails;
* Multiple barrier task failures from a single barrier stage should not trigger 
multiple stage retries;
* Barrier task failure from a previous failed stage attempt should not trigger 
stage retry;
* Fail the job when a task from a barrier ResultStage failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24941) Add RDDBarrier.coalesce() function

2018-08-14 Thread Jiang Xingbo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579930#comment-16579930
 ] 

Jiang Xingbo commented on SPARK-24941:
--

Shall we add something like `spark.default.parallelism`? It maybe not like a 
fixed number but be a fraction to say that any barrier stage shall launch tasks 
less than the fraction * totalCores ?

> Add RDDBarrier.coalesce() function
> --
>
> Key: SPARK-24941
> URL: https://issues.apache.org/jira/browse/SPARK-24941
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Jiang Xingbo
>Priority: Major
>
> https://github.com/apache/spark/pull/21758#discussion_r204917245
> The number of partitions from the input data can be unexpectedly large, eg. 
> if you do
> {code}
> sc.textFile(...).barrier().mapPartitions()
> {code}
> The number of input partitions is based on the hdfs input splits. We shall 
> provide a way in RDDBarrier to enable users to specify the number of tasks in 
> a barrier stage. Maybe something like RDDBarrier.coalesce(numPartitions: Int) 
> .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25114) RecordBinaryComparator may return wrong result when subtraction between two words is divisible by Integer.MAX_VALUE

2018-08-14 Thread Jiang Xingbo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579879#comment-16579879
 ] 

Jiang Xingbo commented on SPARK-25114:
--

I created https://github.com/apache/spark/pull/22101 for this.

> RecordBinaryComparator may return wrong result when subtraction between two 
> words is divisible by Integer.MAX_VALUE
> ---
>
> Key: SPARK-25114
> URL: https://issues.apache.org/jira/browse/SPARK-25114
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Jiang Xingbo
>Priority: Blocker
>  Labels: correctness
>
> It is possible for two objects to be unequal and yet we consider them as 
> equal within RecordBinaryComparator, if the long values are separated by 
> Int.MaxValue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25114) RecordBinaryComparator may return wrong result when subtraction between two words is divisible by Integer.MAX_VALUE

2018-08-14 Thread Jiang Xingbo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xingbo updated SPARK-25114:
-
Labels: correctness  (was: )

> RecordBinaryComparator may return wrong result when subtraction between two 
> words is divisible by Integer.MAX_VALUE
> ---
>
> Key: SPARK-25114
> URL: https://issues.apache.org/jira/browse/SPARK-25114
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Jiang Xingbo
>Priority: Blocker
>  Labels: correctness
>
> It is possible for two objects to be unequal and yet we consider them as 
> equal within RecordBinaryComparator, if the long values are separated by 
> Int.MaxValue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25114) RecordBinaryComparator may return wrong result when subtraction between two words is divisible by Integer.MAX_VALUE

2018-08-14 Thread Jiang Xingbo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xingbo updated SPARK-25114:
-
Priority: Blocker  (was: Major)

> RecordBinaryComparator may return wrong result when subtraction between two 
> words is divisible by Integer.MAX_VALUE
> ---
>
> Key: SPARK-25114
> URL: https://issues.apache.org/jira/browse/SPARK-25114
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Jiang Xingbo
>Priority: Blocker
>  Labels: correctness
>
> It is possible for two objects to be unequal and yet we consider them as 
> equal within RecordBinaryComparator, if the long values are separated by 
> Int.MaxValue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25114) RecordBinaryComparator may return wrong result when subtraction between two words is divisible by Integer.MAX_VALUE

2018-08-14 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-25114:


 Summary: RecordBinaryComparator may return wrong result when 
subtraction between two words is divisible by Integer.MAX_VALUE
 Key: SPARK-25114
 URL: https://issues.apache.org/jira/browse/SPARK-25114
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: Jiang Xingbo


It is possible for two objects to be unequal and yet we consider them as equal 
within RecordBinaryComparator, if the long values are separated by Int.MaxValue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25095) Python support for BarrierTaskContext

2018-08-12 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-25095:


 Summary: Python support for BarrierTaskContext
 Key: SPARK-25095
 URL: https://issues.apache.org/jira/browse/SPARK-25095
 Project: Spark
  Issue Type: Task
  Components: PySpark
Affects Versions: 2.4.0
Reporter: Jiang Xingbo


Enable call `BarrierTaskContext.barrier()` from python side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23207) Shuffle+Repartition on an DataFrame could lead to incorrect answers

2018-08-09 Thread Jiang Xingbo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574903#comment-16574903
 ] 

Jiang Xingbo commented on SPARK-23207:
--

This affects the 2.2 and lower versions, the reason why we didn't backport the 
patch is that it can cause huge perf regression to `repartition()` operation, 
and chance to hit this correctness bug is small. cc [~smilegator][~sameerag]

> Shuffle+Repartition on an DataFrame could lead to incorrect answers
> ---
>
> Key: SPARK-23207
> URL: https://issues.apache.org/jira/browse/SPARK-23207
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Jiang Xingbo
>Assignee: Jiang Xingbo
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.3.0
>
>
> Currently shuffle repartition uses RoundRobinPartitioning, the generated 
> result is nondeterministic since the sequence of input rows are not 
> determined.
> The bug can be triggered when there is a repartition call following a shuffle 
> (which would lead to non-deterministic row ordering), as the pattern shows 
> below:
> upstream stage -> repartition stage -> result stage
> (-> indicate a shuffle)
> When one of the executors process goes down, some tasks on the repartition 
> stage will be retried and generate inconsistent ordering, and some tasks of 
> the result stage will be retried generating different data.
> The following code returns 931532, instead of 100:
> {code}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
>   x
> }.repartition(200).map { x =>
>   if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
> throw new Exception("pkill -f java".!!)
>   }
>   x
> }
> res.distinct().count()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25074) Implement maxNumConcurrentTasks() in MesosFineGrainedSchedulerBackend

2018-08-09 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-25074:


 Summary: Implement maxNumConcurrentTasks() in 
MesosFineGrainedSchedulerBackend
 Key: SPARK-25074
 URL: https://issues.apache.org/jira/browse/SPARK-25074
 Project: Spark
  Issue Type: Task
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Jiang Xingbo


We added a new method `maxNumConcurrentTasks()` to `SchedulerBackend` to get 
the max number of tasks that can be concurrent launched currently. However the 
method is not implemented in `MesosFineGrainedSchedulerBackend`, so submit a 
job containing barrier stage shall always fail fast with 
`MesosFineGrainedSchedulerBackend` resource manager.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25045) Make `RDDBarrier.mapParititions` similar to `RDD.mapPartitions`

2018-08-07 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-25045:


 Summary: Make `RDDBarrier.mapParititions` similar to 
`RDD.mapPartitions`
 Key: SPARK-25045
 URL: https://issues.apache.org/jira/browse/SPARK-25045
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Jiang Xingbo


Signature of the function passed to `RDDBarrier.mapPartitions()` is different 
from that of `RDD.mapPartitions`. The latter doesn’t take a TaskContext. We 
shall make the function signature the same to avoid confusion and misusage.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25030) SparkSubmit will not return result if the mainClass submitted creates a Timer()

2018-08-06 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-25030:


 Summary: SparkSubmit will not return result if the mainClass 
submitted creates a Timer()
 Key: SPARK-25030
 URL: https://issues.apache.org/jira/browse/SPARK-25030
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.1
Reporter: Jiang Xingbo


Create a Timer() in the mainClass submitted to SparkSubmit makes it unable to 
fetch result, it is very easy to reproduce the issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark

2018-08-04 Thread Jiang Xingbo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569202#comment-16569202
 ] 

Jiang Xingbo commented on SPARK-24375:
--

[~mridulm80] You are right that now we are not able to identify which barrier 
it is until we really executed the barrier() function. We've thought hard on 
the issue and don't feel we can make it unless we force users to explicitly set 
a number in a barrier() call (actually it's not a good idea because it brings 
more borden to manage the code).

The current decision is that we don't distinguish barrier() calls from the same 
task, users shall be responsible to ensure the same number of barrier() calls 
shall happen in all possible code branches, otherwise you may get the job 
hanging or a SparkException after timeout.

We've added the following message to the description of 
`BarrierTaskContext.barrier()`, I hope these can be useful:
{quote}
   * CAUTION! In a barrier stage, each task must have the same number of 
barrier() calls, in all
   * possible code branches. Otherwise, you may get the job hanging or a 
SparkException after
   * timeout. 
{quote}

> Design sketch: support barrier scheduling in Apache Spark
> -
>
> Key: SPARK-24375
> URL: https://issues.apache.org/jira/browse/SPARK-24375
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> This task is to outline a design sketch for the barrier scheduling SPIP 
> discussion. It doesn't need to be a complete design before the vote. But it 
> should at least cover both Scala/Java and PySpark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25017) Add test suite for ContextBarrierState

2018-08-03 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-25017:


 Summary: Add test suite for ContextBarrierState
 Key: SPARK-25017
 URL: https://issues.apache.org/jira/browse/SPARK-25017
 Project: Spark
  Issue Type: Test
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Jiang Xingbo


We shall be able to add unit test to ContextBarrierState with a mocked 
RpcCallContext. Currently it's only covered by end-to-end test in 
`BarrierTaskContextSuite`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24884) Implement regexp_extract_all

2018-08-02 Thread Jiang Xingbo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16566752#comment-16566752
 ] 

Jiang Xingbo commented on SPARK-24884:
--

You don't need to be assigned, just prepare and submit a PR, include the JIRA 
number (SPARK-24884) in the title. 

> Implement regexp_extract_all
> 
>
> Key: SPARK-24884
> URL: https://issues.apache.org/jira/browse/SPARK-24884
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Nick Nicolini
>Priority: Major
>
> I've recently hit many cases of regexp parsing where we need to match on 
> something that is always arbitrary in length; for example, a text block that 
> looks something like:
> {code:java}
> AAA:WORDS|
> BBB:TEXT|
> MSG:ASDF|
> MSG:QWER|
> ...
> MSG:ZXCV|{code}
> Where I need to pull out all values between "MSG:" and "|", which can occur 
> in each instance between 1 and n times. I cannot reliably use the existing 
> {{regexp_extract}} method since the number of occurrences is always 
> arbitrary, and while I can write a UDF to handle this it'd be great if this 
> was supported natively in Spark.
> Perhaps we can implement something like {{regexp_extract_all}} as 
> [Presto|https://prestodb.io/docs/current/functions/regexp.html] and 
> [Pig|https://pig.apache.org/docs/latest/api/org/apache/pig/builtin/REGEX_EXTRACT_ALL.html]
>  have?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24817) Implement BarrierTaskContext.barrier()

2018-08-01 Thread Jiang Xingbo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16566278#comment-16566278
 ] 

Jiang Xingbo commented on SPARK-24817:
--

Actually the current implementation of _barrier_ function doesn't requires 
communications between executors, all executors just talk to a 
_BarrierCoordinator_ which is in the driver. But to allow launching ML 
workloads we do need to enable executors to communicate with each other 
directly, IIUC that shall be investigated under SPARK-24724 . Maybe [~mengxr] 
can provide more context here.

> Implement BarrierTaskContext.barrier()
> --
>
> Key: SPARK-24817
> URL: https://issues.apache.org/jira/browse/SPARK-24817
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Jiang Xingbo
>Priority: Major
>
> Implement BarrierTaskContext.barrier(), to support global sync between all 
> the tasks in a barrier stage. The global sync shall finish immediately once 
> all tasks in the same barrier stage reaches the same barrier.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24582) Design: Barrier execution mode

2018-07-30 Thread Jiang Xingbo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xingbo resolved SPARK-24582.
--
Resolution: Fixed

> Design: Barrier execution mode
> --
>
> Key: SPARK-24582
> URL: https://issues.apache.org/jira/browse/SPARK-24582
> Project: Spark
>  Issue Type: Story
>  Components: ML, Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> [~jiangxb1987] and [~cloud_fan] outlined a design sketch in SPARK-24375, 
> which covers some basic scenarios. This story is for a formal design of the 
> barrier execution mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24581) Design: BarrierTaskContext.barrier()

2018-07-30 Thread Jiang Xingbo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xingbo resolved SPARK-24581.
--
Resolution: Fixed

> Design: BarrierTaskContext.barrier()
> 
>
> Key: SPARK-24581
> URL: https://issues.apache.org/jira/browse/SPARK-24581
> Project: Spark
>  Issue Type: Story
>  Components: ML, Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> We need to provide a communication barrier function to users to help 
> coordinate tasks within a barrier stage. This is very similar to MPI_Barrier 
> function in MPI. This story is for its design.
>  
> Requirements:
>  * Low-latency. The tasks should be unblocked soon after all tasks have 
> reached this barrier. The latency is more important than CPU cycles here.
>  * Support unlimited timeout with proper logging. For DL tasks, it might take 
> very long to converge, we should support unlimited timeout with proper 
> logging. So users know why a task is waiting.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24954) Fail fast on job submit if run a barrier stage with dynamic resource allocation enabled

2018-07-27 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-24954:


 Summary: Fail fast on job submit if run a barrier stage with 
dynamic resource allocation enabled
 Key: SPARK-24954
 URL: https://issues.apache.org/jira/browse/SPARK-24954
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Jiang Xingbo


Since we explicitly listed "Support running barrier stage with dynamic resource 
allocation" a Non-Goal in the design doc, we shall fail fast on job submit if 
running a barrier stage with dynamic resource allocation enabled, to avoid some 
confusing behaviors (can refer to SPARK-24942 for some examples).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24942) Improve cluster resource management with jobs containing barrier stage

2018-07-27 Thread Jiang Xingbo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xingbo updated SPARK-24942:
-
Target Version/s: 3.0.0

> Improve cluster resource management with jobs containing barrier stage
> --
>
> Key: SPARK-24942
> URL: https://issues.apache.org/jira/browse/SPARK-24942
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Jiang Xingbo
>Priority: Major
>
> https://github.com/apache/spark/pull/21758#discussion_r205652317
> We shall improve cluster resource management to address the following issues:
> - With dynamic resource allocation enabled, it may happen that we acquire 
> some executors (but not enough to launch all the tasks in a barrier stage) 
> and later release them due to executor idle time expire, and then acquire 
> again.
> - There can be deadlock with two concurrent applications. Each application 
> may acquire some resources, but not enough to launch all the tasks in a 
> barrier stage. And after hitting the idle timeout and releasing them, they 
> may acquire resources again, but just continually trade resources between 
> each other.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24941) Add RDDBarrier.coalesce() function

2018-07-27 Thread Jiang Xingbo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xingbo updated SPARK-24941:
-
Target Version/s: 3.0.0

> Add RDDBarrier.coalesce() function
> --
>
> Key: SPARK-24941
> URL: https://issues.apache.org/jira/browse/SPARK-24941
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Jiang Xingbo
>Priority: Major
>
> https://github.com/apache/spark/pull/21758#discussion_r204917245
> The number of partitions from the input data can be unexpectedly large, eg. 
> if you do
> {code}
> sc.textFile(...).barrier().mapPartitions()
> {code}
> The number of input partitions is based on the hdfs input splits. We shall 
> provide a way in RDDBarrier to enable users to specify the number of tasks in 
> a barrier stage. Maybe something like RDDBarrier.coalesce(numPartitions: Int) 
> .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24942) Improve cluster resource management with jobs containing barrier stage

2018-07-26 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-24942:


 Summary: Improve cluster resource management with jobs containing 
barrier stage
 Key: SPARK-24942
 URL: https://issues.apache.org/jira/browse/SPARK-24942
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Jiang Xingbo


https://github.com/apache/spark/pull/21758#discussion_r205652317

We shall improve cluster resource management to address the following issues:
- With dynamic resource allocation enabled, it may happen that we acquire some 
executors (but not enough to launch all the tasks in a barrier stage) and later 
release them due to executor idle time expire, and then acquire again.
- There can be deadlock with two concurrent applications. Each application may 
acquire some resources, but not enough to launch all the tasks in a barrier 
stage. And after hitting the idle timeout and releasing them, they may acquire 
resources again, but just continually trade resources between each other.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24941) Add RDDBarrier.coalesce() function

2018-07-26 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-24941:


 Summary: Add RDDBarrier.coalesce() function
 Key: SPARK-24941
 URL: https://issues.apache.org/jira/browse/SPARK-24941
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Jiang Xingbo


https://github.com/apache/spark/pull/21758#discussion_r204917245

The number of partitions from the input data can be unexpectedly large, eg. if 
you do
{code}
sc.textFile(...).barrier().mapPartitions()
{code}
The number of input partitions is based on the hdfs input splits. We shall 
provide a way in RDDBarrier to enable users to specify the number of tasks in a 
barrier stage. Maybe something like RDDBarrier.coalesce(numPartitions: Int) .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24581) Design: BarrierTaskContext.barrier()

2018-07-24 Thread Jiang Xingbo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554409#comment-16554409
 ] 

Jiang Xingbo commented on SPARK-24581:
--

Design doc: 
https://docs.google.com/document/d/1r07-vU5JTH6s1jJ6azkmK0K5it6jwpfO6b_K3mJmxR4/edit?usp=sharing

> Design: BarrierTaskContext.barrier()
> 
>
> Key: SPARK-24581
> URL: https://issues.apache.org/jira/browse/SPARK-24581
> Project: Spark
>  Issue Type: Story
>  Components: ML, Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> We need to provide a communication barrier function to users to help 
> coordinate tasks within a barrier stage. This is very similar to MPI_Barrier 
> function in MPI. This story is for its design.
>  
> Requirements:
>  * Low-latency. The tasks should be unblocked soon after all tasks have 
> reached this barrier. The latency is more important than CPU cycles here.
>  * Support unlimited timeout with proper logging. For DL tasks, it might take 
> very long to converge, we should support unlimited timeout with proper 
> logging. So users know why a task is waiting.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24340) Clean up non-shuffle disk block manager files following executor death

2018-07-21 Thread Jiang Xingbo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xingbo resolved SPARK-24340.
--
Resolution: Fixed

> Clean up non-shuffle disk block manager files following executor death
> --
>
> Key: SPARK-24340
> URL: https://issues.apache.org/jira/browse/SPARK-24340
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Jiang Xingbo
>Priority: Major
>
> Currently we only clean up local folders on application removed, and we don't 
> clean up non-shuffle files, such as temp. shuffle blocks, cached 
> RDD/broadcast blocks, spill files, etc. and this can cause disk space leaks 
> when executors periodically die and are replaced.
> To avoid this source of disk space leak, we can clean up executor disk store 
> files except for shuffle index and data files on executor finished.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24340) Clean up non-shuffle disk block manager files following executor death

2018-07-21 Thread Jiang Xingbo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551714#comment-16551714
 ] 

Jiang Xingbo commented on SPARK-24340:
--

Thanks~

> Clean up non-shuffle disk block manager files following executor death
> --
>
> Key: SPARK-24340
> URL: https://issues.apache.org/jira/browse/SPARK-24340
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Jiang Xingbo
>Priority: Major
>
> Currently we only clean up local folders on application removed, and we don't 
> clean up non-shuffle files, such as temp. shuffle blocks, cached 
> RDD/broadcast blocks, spill files, etc. and this can cause disk space leaks 
> when executors periodically die and are replaced.
> To avoid this source of disk space leak, we can clean up executor disk store 
> files except for shuffle index and data files on executor finished.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24877) Ignore the task completion event from a zombie barrier task

2018-07-20 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-24877:


 Summary: Ignore the task completion event from a zombie barrier 
task
 Key: SPARK-24877
 URL: https://issues.apache.org/jira/browse/SPARK-24877
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Jiang Xingbo


Currently we abort the barrier stage if a zombie barrier task can't get killed 
to prevent data correctness issue. We can improve the behavior to let zombie 
barrier task continue running but not able to interact with other barrier tasks 
(maybe from different stage attempt) and ignore the task completion event from 
a zombie barrier task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24874) Allow hybrid of both barrier tasks and regular tasks in a stage

2018-07-20 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-24874:


 Summary: Allow hybrid of both barrier tasks and regular tasks in a 
stage
 Key: SPARK-24874
 URL: https://issues.apache.org/jira/browse/SPARK-24874
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Jiang Xingbo


Currently we only allow barrier tasks in a barrier stage, however, consider the 
following query:
{code}
sc = new SparkContext(conf)
val rdd1 = sc.parallelize(1 to 100, 10)
val rdd2 = sc.parallelize(1 to 1000, 20).barrier().mapPartitions((it, ctx) => 
it)
val rdd = rdd1.union(rdd2).mapPartitions(t => t)
{code}

Now it requires 30 free slots to run `rdd.collect()`. Actually, we can launch 
regular tasks to collect data from rdd1's partitions, they are not required to 
be launched together. If we can do that, we only need 20 free slots to run 
`rdd.collect()`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark

2018-07-18 Thread Jiang Xingbo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16548784#comment-16548784
 ] 

Jiang Xingbo commented on SPARK-24375:
--

{quote}Is the 'barrier' logic pluggable ? Instead of only being a global sync 
point.
{quote}
The barrier() function is quite like 
[MPI_Barrier|https://www.mpich.org/static/docs/v3.2.1/www/www3/MPI_Barrier.html]
 function in MPI, the major purpose is to provide a way to do global sync 
between barrier tasks. I'm not sure whether we have plan to support pluggable 
logic for now, do you have a case in hand that require pluggable barrier() ?
{quote}Dynamic resource allocation (dra) triggers allocation of additional 
resources based on pending tasks - hence the comment We may add a check of 
total available slots before scheduling tasks from a barrier stage taskset. 
does not necessarily work in that context.
{quote}
Support running barrier stage with dynamic resource allocation is a Non-Goal 
here, however, we can improve the behavior to integrate better with DRA in 
Spark 3.0 .
{quote}Currently DRA in spark uniformly allocates resources - are we 
envisioning changes as part of this effort to allocate heterogenous executor 
resources based on pending tasks (atleast initially for barrier support for 
gpu's) ?
{quote}
There is another ongoing SPIP SPARK-24615 to add accelerator-aware task 
scheduling for Spark, I think we shall deal with the above issue within that 
topic.
{quote}In face of exceptions, some tasks will wait on barrier 2 and others on 
barrier 1 : causing issues.{quote}
It's not desired behavior to catch exception thrown by TaskContext.barrier() 
silently. However, in case this really happens, we can detect that because we 
have `epoch` both in driver side and executor side, more details will go to the 
design doc of BarrierTaskContext.barrier() SPARK-24581
 {quote}Can you elaborate more on leveraging TaskContext.localProperties ? Is 
it expected to be sync'ed after 'barrier' returns ? What gaurantees are we 
expecting to provide ?{quote}
We update the localProperties in driver and in executors you shall be able to 
fetch the updated values through TaskContext, it should not couple with 
`barrier()` function.

> Design sketch: support barrier scheduling in Apache Spark
> -
>
> Key: SPARK-24375
> URL: https://issues.apache.org/jira/browse/SPARK-24375
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> This task is to outline a design sketch for the barrier scheduling SPIP 
> discussion. It doesn't need to be a complete design before the vote. But it 
> should at least cover both Scala/Java and PySpark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24824) Make Spark task speculation a per-stage config

2018-07-16 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-24824:


 Summary: Make Spark task speculation a per-stage config
 Key: SPARK-24824
 URL: https://issues.apache.org/jira/browse/SPARK-24824
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Jiang Xingbo


Make Spark task speculation a per-stage config, so we can explicitly disable 
task speculation for a barrier stage.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24823) Cancel a job that contains barrier stage(s) if the barrier tasks don't get launched within a configured time

2018-07-16 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-24823:


 Summary: Cancel a job that contains barrier stage(s) if the 
barrier tasks don't get launched within a configured time
 Key: SPARK-24823
 URL: https://issues.apache.org/jira/browse/SPARK-24823
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Jiang Xingbo


Cancel a job that contains barrier stage(s) if the barrier tasks don't get 
launched within a configured time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24822) Python support for barrier execution mode

2018-07-16 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-24822:


 Summary: Python support for barrier execution mode
 Key: SPARK-24822
 URL: https://issues.apache.org/jira/browse/SPARK-24822
 Project: Spark
  Issue Type: New Feature
  Components: PySpark, Spark Core
Affects Versions: 2.4.0
Reporter: Jiang Xingbo


Enable launch a job containing barrier stage(s) from PySpark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24821) Fail fast when submitted job compute on a subset of all the partitions for a barrier stage

2018-07-16 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-24821:


 Summary: Fail fast when submitted job compute on a subset of all 
the partitions for a barrier stage
 Key: SPARK-24821
 URL: https://issues.apache.org/jira/browse/SPARK-24821
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Jiang Xingbo


Detect SparkContext.runJob() launch a barrier stage with a subset of all the 
partitions, one example is the `first()` operation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24820) Fail fast when submitted job contains PartitionPruningRDD in a barrier stage

2018-07-16 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-24820:


 Summary: Fail fast when submitted job contains PartitionPruningRDD 
in a barrier stage
 Key: SPARK-24820
 URL: https://issues.apache.org/jira/browse/SPARK-24820
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Jiang Xingbo


Detect SparkContext.runJob() launch a barrier stage including 
PartitionPruningRDD.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24819) Fail fast when no enough slots to launch the barrier stage on job submitted

2018-07-16 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-24819:


 Summary: Fail fast when no enough slots to launch the barrier 
stage on job submitted
 Key: SPARK-24819
 URL: https://issues.apache.org/jira/browse/SPARK-24819
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Jiang Xingbo


Check all the barrier stages on job submitted, to see whether the barrier 
stages require more slots (to be able to launch all the barrier tasks in the 
same stage together) than currently active slots in the cluster. If the job 
requires more slots than available (both busy and free slots), fail the job on 
submit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24818) Ensure all the barrier tasks in the same stage are launched together

2018-07-16 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-24818:


 Summary: Ensure all the barrier tasks in the same stage are 
launched together
 Key: SPARK-24818
 URL: https://issues.apache.org/jira/browse/SPARK-24818
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Jiang Xingbo


When some executors/hosts are blacklisted, it may happen that only a part of 
the tasks in the same barrier stage can be launched. We shall detect the case 
and revert the allocated resource offers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24817) Implement BarrierTaskContext.barrier()

2018-07-16 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-24817:


 Summary: Implement BarrierTaskContext.barrier()
 Key: SPARK-24817
 URL: https://issues.apache.org/jira/browse/SPARK-24817
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Jiang Xingbo


Implement BarrierTaskContext.barrier(), to support global sync between all the 
tasks in a barrier stage. The global sync shall finish immediately once all 
tasks in the same barrier stage reaches the same barrier.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24795) Implement barrier execution mode

2018-07-12 Thread Jiang Xingbo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xingbo updated SPARK-24795:
-
Description: 
Implement barrier execution mode, as described in SPARK-24582
Include all the API changes and basic implementation (except for 
BarrierTaskContext.barrier())

> Implement barrier execution mode
> 
>
> Key: SPARK-24795
> URL: https://issues.apache.org/jira/browse/SPARK-24795
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Jiang Xingbo
>Priority: Major
>
> Implement barrier execution mode, as described in SPARK-24582
> Include all the API changes and basic implementation (except for 
> BarrierTaskContext.barrier())



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24795) Implement barrier execution mode

2018-07-12 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-24795:


 Summary: Implement barrier execution mode
 Key: SPARK-24795
 URL: https://issues.apache.org/jira/browse/SPARK-24795
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Jiang Xingbo






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24582) Design: Barrier execution mode

2018-07-08 Thread Jiang Xingbo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536331#comment-16536331
 ] 

Jiang Xingbo commented on SPARK-24582:
--

Design doc: 
[https://docs.google.com/document/d/1GvcYR6ZFto3dOnjfLjZMtTezX0W5VYN9w1l4-tQXaZk/edit#|https://docs.google.com/document/d/1GvcYR6ZFto3dOnjfLjZMtTezX0W5VYN9w1l4-tQXaZk/edit]

> Design: Barrier execution mode
> --
>
> Key: SPARK-24582
> URL: https://issues.apache.org/jira/browse/SPARK-24582
> Project: Spark
>  Issue Type: Story
>  Components: ML, Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> [~jiangxb1987] and [~cloud_fan] outlined a design sketch in SPARK-24375, 
> which covers some basic scenarios. This story is for a formal design of the 
> barrier execution mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24564) Add test suite for RecordBinaryComparator

2018-06-14 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-24564:


 Summary: Add test suite for RecordBinaryComparator
 Key: SPARK-24564
 URL: https://issues.apache.org/jira/browse/SPARK-24564
 Project: Spark
  Issue Type: Test
  Components: SQL
Affects Versions: 2.4.0
Reporter: Jiang Xingbo






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24552) Task attempt numbers are reused when stages are retried

2018-06-13 Thread Jiang Xingbo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511695#comment-16511695
 ] 

Jiang Xingbo edited comment on SPARK-24552 at 6/13/18 9:47 PM:
---

IIUC stageAttemptId + taskAttemptNumber shall probably define a unique task 
attempt, and it carries enough information to know how many failed attempts you 
had previously.


was (Author: jiangxb1987):
IIUC stageAttemptId + taskAttemptId shall probably define a unique task 
attempt, and it carries enough information to know how many failed attempts you 
had previously.

> Task attempt numbers are reused when stages are retried
> ---
>
> Key: SPARK-24552
> URL: https://issues.apache.org/jira/browse/SPARK-24552
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1
>Reporter: Ryan Blue
>Priority: Major
>
> When stages are retried due to shuffle failures, task attempt numbers are 
> reused. This causes a correctness bug in the v2 data sources write path.
> Data sources (both the original and v2) pass the task attempt to writers so 
> that writers can use the attempt number to track and clean up data from 
> failed or speculative attempts. In the v2 docs for DataWriterFactory, the 
> attempt number's javadoc states that "Implementations can use this attempt 
> number to distinguish writers of different task attempts."
> When two attempts of a stage use the same (partition, attempt) pair, two 
> tasks can create the same data and attempt to commit. The commit coordinator 
> prevents both from committing and will abort the attempt that finishes last. 
> When using the (partition, attempt) pair to track data, the aborted task may 
> delete data associated with the (partition, attempt) pair. If that happens, 
> the data for the task that committed is also deleted as well, which is a 
> correctness bug.
> For a concrete example, I have a data source that creates files in place 
> named with {{part---.}}. Because these 
> files are written in place, both tasks create the same file and the one that 
> is aborted deletes the file, leading to data corruption when the file is 
> added to the table.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24552) Task attempt numbers are reused when stages are retried

2018-06-13 Thread Jiang Xingbo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511695#comment-16511695
 ] 

Jiang Xingbo commented on SPARK-24552:
--

IIUC stageAttemptId + taskAttemptId shall probably define a unique task 
attempt, and it carries enough information to know how many failed attempts you 
had previously.

> Task attempt numbers are reused when stages are retried
> ---
>
> Key: SPARK-24552
> URL: https://issues.apache.org/jira/browse/SPARK-24552
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1
>Reporter: Ryan Blue
>Priority: Major
>
> When stages are retried due to shuffle failures, task attempt numbers are 
> reused. This causes a correctness bug in the v2 data sources write path.
> Data sources (both the original and v2) pass the task attempt to writers so 
> that writers can use the attempt number to track and clean up data from 
> failed or speculative attempts. In the v2 docs for DataWriterFactory, the 
> attempt number's javadoc states that "Implementations can use this attempt 
> number to distinguish writers of different task attempts."
> When two attempts of a stage use the same (partition, attempt) pair, two 
> tasks can create the same data and attempt to commit. The commit coordinator 
> prevents both from committing and will abort the attempt that finishes last. 
> When using the (partition, attempt) pair to track data, the aborted task may 
> delete data associated with the (partition, attempt) pair. If that happens, 
> the data for the task that committed is also deleted as well, which is a 
> correctness bug.
> For a concrete example, I have a data source that creates files in place 
> named with {{part---.}}. Because these 
> files are written in place, both tasks create the same file and the one that 
> is aborted deletes the file, leading to data corruption when the file is 
> added to the table.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24387) Heartbeat-timeout executor is added back and used again

2018-06-11 Thread Jiang Xingbo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508832#comment-16508832
 ] 

Jiang Xingbo commented on SPARK-24387:
--

{quote}So I think there's a race condition that the backend may make offers 
before killing the executor. And since this is the only executor left, it's 
offered to the TaskScheduler and the retried task is scheduled to it.{quote}
IIUC removing an executor due to heartbeat timeout will be treated as a 
SlaveLost, which shall encounter a taskFailure for each task running on that 
executor, and therefore blacklist the task from running again on that executor, 
so why can we offer the executor to the retried task again?

> Heartbeat-timeout executor is added back and used again
> ---
>
> Key: SPARK-24387
> URL: https://issues.apache.org/jira/browse/SPARK-24387
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Rui Li
>Priority: Major
>
> In our job, when there's only one task and one executor running, the 
> executor's heartbeat is lost and driver decides to remove it. However, the 
> executor is added again and the task's retry attempt is scheduled to that 
> executor, almost immediately after the executor is marked as lost.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24492) Endless attempted task when TaskCommitDenied exception writing to S3A

2018-06-08 Thread Jiang Xingbo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16506232#comment-16506232
 ] 

Jiang Xingbo commented on SPARK-24492:
--

It is possible that one task attempt acquired the permission to commit output, 
but don't finish performing commit in a while. In the mean time, another 
attempt of the same task (e.g. speculative run) may also ask for commit but 
failed with TaskCommitDenied exception. Under this case the current behavior of 
retrying without counting the failure into task failure count could lead to the 
task retries for  infinity times until it get the permission to commit, this 
can waste a lot of resources if the task is short. Instead, I purpose to skip 
retry the task attempt in case of TaskCommitDenied exception, since that means 
another attempt is performing commit for the same task, and we can wait till it 
finishes (If the commit finishes successfully then nothing left to be done, if 
it fail then we can still retry).

> Endless attempted task when TaskCommitDenied exception writing to S3A
> -
>
> Key: SPARK-24492
> URL: https://issues.apache.org/jira/browse/SPARK-24492
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Yu-Jhe Li
>Priority: Critical
> Attachments: retry_stage.png, 螢幕快照 2018-05-16 上午11.10.46.png, 螢幕快照 
> 2018-05-16 上午11.10.57.png
>
>
> Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and 
> output file to S3, some tasks endless retry and all of them failed with 
> TaskCommitDenied exception. This happened when we run Spark application on 
> some network issue instances. (it runs well on healthy spot instances)
> Sorry, I can find a easy way to reproduce this issue, here's all I can 
> provide.
> The Spark UI shows (in attachments) one task of stage 112 failed due to 
> FetchFailedException (it is network issue) and attempt to retry a new stage 
> 112 (retry 1). But in stage 112 (retry 1), all task failed due to 
> TaskCommitDenied exception, and keep retry (it never succeed and cause lots 
> of S3 requests).
> On the other side, driver logs shows:
>  # task 123.0 in stage 112.0 failed due to FetchFailedException (network 
> issue cause corrupted file)
>  # warning message from OutputCommitCoordinator
>  # task 92.0 in stage 112.1 failed when writing rows
>  # keep retry the failed tasks, but never succeed
> {noformat}
> 2018-05-16 02:38:055 WARN  TaskSetManager:66 - Lost task 123.0 in stage 112.0 
> (TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, 
> 10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message=
> org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at 

[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark

2018-06-06 Thread Jiang Xingbo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503647#comment-16503647
 ] 

Jiang Xingbo commented on SPARK-24375:
--

The major problem is that tasks in the same stage of a MPI workload may rely on 
the internal results of other parallel running folk tasks to compute the final 
results, thus when a task fail, other tasks in the same stage may generate 
incorrect result or even hang, and it seems to be straight-forward to just 
retry the whole stage on task failure.

> Design sketch: support barrier scheduling in Apache Spark
> -
>
> Key: SPARK-24375
> URL: https://issues.apache.org/jira/browse/SPARK-24375
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> This task is to outline a design sketch for the barrier scheduling SPIP 
> discussion. It doesn't need to be a complete design before the vote. But it 
> should at least cover both Scala/Java and PySpark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark

2018-05-24 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16489764#comment-16489764
 ] 

Jiang Xingbo commented on SPARK-24375:
--

We proposal to add new RDDBarrier and BarrierTaskContext to support barrier 
scheduling in Spark, it also requires to modify how the job scheduling works a 
bit to accommodate the new feature.

 

*Barrier Stage*: A barrier stage doesn’t launch any of its tasks until the 
available slots(free CPU cores can be used to launch pending tasks) satisfies 
the target to launch all the tasks at the same time, and always retry the whole 
stage when any task(s) fail. One way to identify whether a stage is a barrier 
stage can be tracing the RDD that the stage runs on, if the stage contains 
RDDBarrier or at least one of the ancestor RDD(s) are RDDBarrier then the stage 
is a barrier stage, the tracing shall stop at ShuffleRDD(s).

 

*Schedule Barrier Tasks*: Currently TaskScheduler schedule pending tasks on 
available slots by best effort, so normally all tasks in the same stage don’t 
get launched at the same time. We may add a check of total available slots 
before scheduling tasks from a barrier stage taskset. It is still possible that 
only partial tasks of a whole barrier stage taskset get launched due to task 
locality issues, so we have to check again before launch to ensure that all 
tasks in the same barrier stage get launched at the same time.

If we consider scheduling several jobs at the same time(both barrier and 
regular jobs), it may be possible that barrier tasks are block by regular 
tasks(when available slots are always less than that required by a barrier 
stage taskset), or barrier stage taskset may block another barrier stage 
taskset(when a barrier stage taskset that requires less slots is prone to be 
scheduled earlier). Currently we don’t have a perfect solution for all these 
scenarios, but at least we may avoid the worst case that a huge barrier stage 
taskset being blocked forever on a busy cluster, using a time-based weight 
approach(conceptionally, a taskset that have been pending for a longer time 
will be assigned greater priority weight to be scheduled).

 

*Task Barrier*: Barrier tasks shall allow users to insert sync in the middle of 
task execution, this can be achieved by introducing a glocal barrier operation 
in TaskContext, which makes the current task wait until all tasks in the same 
stage hit this barrier.

 

*Task Failure*: To ensure correctness, a barrier stage always retry the whole 
stage when any task(s) fail. Thus, it’s quite straightforward that we shall 
require kill all the running tasks of a failed stage, and that also guarantees 
at most one taskset shall be running for each single stage(no zombie tasks).

 

*Speculative Task*: Since we require launch all tasks in a barrier stage at the 
same time, there is no need to launch a speculative task for a barrier stage 
taskset.

 

*Share TaskInfo*: To share informations between tasks in a barrier stage, we 
may update them in `TaskContext.localProperties`.


*Python Support*: Expose RDDBarrier and BarrierTaskContext to pyspark.

 

[~cloud_fan] maybe you want to give additional information I didn't cover 
above? (esp. PySpark)

> Design sketch: support barrier scheduling in Apache Spark
> -
>
> Key: SPARK-24375
> URL: https://issues.apache.org/jira/browse/SPARK-24375
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> This task is to outline a design sketch for the barrier scheduling SPIP 
> discussion. It doesn't need to be a complete design before the vote. But it 
> should at least cover both Scala/Java and PySpark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24340) Clean up non-shuffle disk block manager files following executor death

2018-05-22 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-24340:


 Summary: Clean up non-shuffle disk block manager files following 
executor death
 Key: SPARK-24340
 URL: https://issues.apache.org/jira/browse/SPARK-24340
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Jiang Xingbo


Currently we only clean up local folders on application removed, and we don't 
clean up non-shuffle files, such as temp. shuffle blocks, cached RDD/broadcast 
blocks, spill files, etc. and this can cause disk space leaks when executors 
periodically die and are replaced.

To avoid this source of disk space leak, we can clean up executor disk store 
files except for shuffle index and data files on executor finished.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23881) Flaky test: JobCancellationSuite."interruptible iterator of shuffle reader"

2018-04-06 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-23881:


 Summary: Flaky test: JobCancellationSuite."interruptible iterator 
of shuffle reader"
 Key: SPARK-23881
 URL: https://issues.apache.org/jira/browse/SPARK-23881
 Project: Spark
  Issue Type: Test
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Jiang Xingbo


The test JobCancellationSuite."interruptible iterator of shuffle reader" has 
been flaky:

*branch-2.3*
 * 
[https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/lastCompletedBuild/testReport/org.apache.spark/JobCancellationSuite/interruptible_iterator_of_shuffle_reader/]

*master*
 * 
[https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.7/4301/testReport/junit/org.apache.spark/JobCancellationSuite/interruptible_iterator_of_shuffle_reader/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23525) ALTER TABLE CHANGE COLUMN doesn't work for external hive table

2018-02-27 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378546#comment-16378546
 ] 

Jiang Xingbo commented on SPARK-23525:
--

I'm working on a fix for it, and will try to backport the fix to 2.2.

> ALTER TABLE CHANGE COLUMN doesn't work for external hive table
> --
>
> Key: SPARK-23525
> URL: https://issues.apache.org/jira/browse/SPARK-23525
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Pavlo Skliar
>Priority: Major
>
> {code:java}
> print(spark.sql("""
> SHOW CREATE TABLE test.trends
> """).collect()[0].createtab_stmt)
> /// OUTPUT
> CREATE EXTERNAL TABLE `test`.`trends`(`id` string COMMENT '', `metric` string 
> COMMENT '', `amount` bigint COMMENT '')
> COMMENT ''
> PARTITIONED BY (`date` string COMMENT '')
> ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
> WITH SERDEPROPERTIES (
>   'serialization.format' = '1'
> )
> STORED AS
>   INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
>   OUTPUTFORMAT 
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
> LOCATION 's3://x/x/'
> TBLPROPERTIES (
>   'transient_lastDdlTime' = '1519729384',
>   'last_modified_time' = '1519645652',
>   'last_modified_by' = 'pavlo',
>   'last_castor_run_ts' = '1513561658.0'
> )
> spark.sql("""
> DESCRIBE test.trends
> """).collect()
> // OUTPUT
> [Row(col_name='id', data_type='string', comment=''),
>  Row(col_name='metric', data_type='string', comment=''),
>  Row(col_name='amount', data_type='bigint', comment=''),
>  Row(col_name='date', data_type='string', comment=''),
>  Row(col_name='# Partition Information', data_type='', comment=''),
>  Row(col_name='# col_name', data_type='data_type', comment='comment'),
>  Row(col_name='date', data_type='string', comment='')]
> spark.sql("""alter table test.trends change column id id string comment 
> 'unique identifier'""")
> spark.sql("""
> DESCRIBE test.trends
> """).collect()
> // OUTPUT
> [Row(col_name='id', data_type='string', comment=''), Row(col_name='metric', 
> data_type='string', comment=''), Row(col_name='amount', data_type='bigint', 
> comment=''), Row(col_name='date', data_type='string', comment=''), 
> Row(col_name='# Partition Information', data_type='', comment=''), 
> Row(col_name='# col_name', data_type='data_type', comment='comment'), 
> Row(col_name='date', data_type='string', comment='')]
> {code}
> The strange is that I've assigned comment to the id field from hive 
> successfully, and it's visible in Hue UI, but it's still not visible in from 
> spark, and any spark requests doesn't have effect on the comments.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23525) ALTER TABLE CHANGE COLUMN doesn't work for external hive table

2018-02-27 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378533#comment-16378533
 ] 

Jiang Xingbo commented on SPARK-23525:
--

Thank you for reporting this. I believe the bug is caused by: 
https://github.com/apache/spark/blob/8077bb04f350fd35df83ef896135c0672dc3f7b0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala#L613

> ALTER TABLE CHANGE COLUMN doesn't work for external hive table
> --
>
> Key: SPARK-23525
> URL: https://issues.apache.org/jira/browse/SPARK-23525
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Pavlo Skliar
>Priority: Major
>
> {code:java}
> print(spark.sql("""
> SHOW CREATE TABLE test.trends
> """).collect()[0].createtab_stmt)
> /// OUTPUT
> CREATE EXTERNAL TABLE `test`.`trends`(`id` string COMMENT '', `metric` string 
> COMMENT '', `amount` bigint COMMENT '')
> COMMENT ''
> PARTITIONED BY (`date` string COMMENT '')
> ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
> WITH SERDEPROPERTIES (
>   'serialization.format' = '1'
> )
> STORED AS
>   INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
>   OUTPUTFORMAT 
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
> LOCATION 's3://x/x/'
> TBLPROPERTIES (
>   'transient_lastDdlTime' = '1519729384',
>   'last_modified_time' = '1519645652',
>   'last_modified_by' = 'pavlo',
>   'last_castor_run_ts' = '1513561658.0'
> )
> spark.sql("""
> DESCRIBE test.trends
> """).collect()
> // OUTPUT
> [Row(col_name='id', data_type='string', comment=''),
>  Row(col_name='metric', data_type='string', comment=''),
>  Row(col_name='amount', data_type='bigint', comment=''),
>  Row(col_name='date', data_type='string', comment=''),
>  Row(col_name='# Partition Information', data_type='', comment=''),
>  Row(col_name='# col_name', data_type='data_type', comment='comment'),
>  Row(col_name='date', data_type='string', comment='')]
> spark.sql("""alter table test.trends change column id id string comment 
> 'unique identifier'""")
> spark.sql("""
> DESCRIBE test.trends
> """).collect()
> // OUTPUT
> [Row(col_name='id', data_type='string', comment=''), Row(col_name='metric', 
> data_type='string', comment=''), Row(col_name='amount', data_type='bigint', 
> comment=''), Row(col_name='date', data_type='string', comment=''), 
> Row(col_name='# Partition Information', data_type='', comment=''), 
> Row(col_name='# col_name', data_type='data_type', comment='comment'), 
> Row(col_name='date', data_type='string', comment='')]
> {code}
> The strange is that I've assigned comment to the id field from hive 
> successfully, and it's visible in Hue UI, but it's still not visible in from 
> spark, and any spark requests doesn't have effect on the comments.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23525) ALTER TABLE CHANGE COLUMN doesn't work for external hive table

2018-02-27 Thread Jiang Xingbo (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xingbo updated SPARK-23525:
-
Affects Version/s: 2.3.0
 Priority: Major  (was: Minor)
  Summary: ALTER TABLE CHANGE COLUMN doesn't work for external hive 
table  (was: Update column comment doesn't work from spark)

> ALTER TABLE CHANGE COLUMN doesn't work for external hive table
> --
>
> Key: SPARK-23525
> URL: https://issues.apache.org/jira/browse/SPARK-23525
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Pavlo Skliar
>Priority: Major
>
> {code:java}
> print(spark.sql("""
> SHOW CREATE TABLE test.trends
> """).collect()[0].createtab_stmt)
> /// OUTPUT
> CREATE EXTERNAL TABLE `test`.`trends`(`id` string COMMENT '', `metric` string 
> COMMENT '', `amount` bigint COMMENT '')
> COMMENT ''
> PARTITIONED BY (`date` string COMMENT '')
> ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
> WITH SERDEPROPERTIES (
>   'serialization.format' = '1'
> )
> STORED AS
>   INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
>   OUTPUTFORMAT 
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
> LOCATION 's3://x/x/'
> TBLPROPERTIES (
>   'transient_lastDdlTime' = '1519729384',
>   'last_modified_time' = '1519645652',
>   'last_modified_by' = 'pavlo',
>   'last_castor_run_ts' = '1513561658.0'
> )
> spark.sql("""
> DESCRIBE test.trends
> """).collect()
> // OUTPUT
> [Row(col_name='id', data_type='string', comment=''),
>  Row(col_name='metric', data_type='string', comment=''),
>  Row(col_name='amount', data_type='bigint', comment=''),
>  Row(col_name='date', data_type='string', comment=''),
>  Row(col_name='# Partition Information', data_type='', comment=''),
>  Row(col_name='# col_name', data_type='data_type', comment='comment'),
>  Row(col_name='date', data_type='string', comment='')]
> spark.sql("""alter table test.trends change column id id string comment 
> 'unique identifier'""")
> spark.sql("""
> DESCRIBE test.trends
> """).collect()
> // OUTPUT
> [Row(col_name='id', data_type='string', comment=''), Row(col_name='metric', 
> data_type='string', comment=''), Row(col_name='amount', data_type='bigint', 
> comment=''), Row(col_name='date', data_type='string', comment=''), 
> Row(col_name='# Partition Information', data_type='', comment=''), 
> Row(col_name='# col_name', data_type='data_type', comment='comment'), 
> Row(col_name='date', data_type='string', comment='')]
> {code}
> The strange is that I've assigned comment to the id field from hive 
> successfully, and it's visible in Hue UI, but it's still not visible in from 
> spark, and any spark requests doesn't have effect on the comments.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23139) Read eventLog file with mixed encodings

2018-02-07 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356507#comment-16356507
 ] 

Jiang Xingbo edited comment on SPARK-23139 at 2/8/18 5:25 AM:
--


{quote}EventLog may contain mixed encodings such as custom exception 
message{quote}


Could you please elaborate on how this happened?


was (Author: jiangxb1987):
```
EventLog may contain mixed encodings such as custom exception message
```

Could you please elaborate on how this happened?

> Read eventLog file with mixed encodings
> ---
>
> Key: SPARK-23139
> URL: https://issues.apache.org/jira/browse/SPARK-23139
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: DENG FEI
>Priority: Major
>
> EventLog may contain mixed encodings such as custom exception message, but 
> caused to replay failure.
> java.nio.charset.MalformedInputException: Input length = 1
> at java.nio.charset.CoderResult.throwException(CoderResult.java:281)
>  at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339)
>  at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
>  at java.io.InputStreamReader.read(InputStreamReader.java:184)
>  at java.io.BufferedReader.fill(BufferedReader.java:161)
>  at java.io.BufferedReader.readLine(BufferedReader.java:324)
>  at java.io.BufferedReader.readLine(BufferedReader.java:389)
>  at 
> scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72)
>  at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:836)
>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
>  at 
> org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:78)
>  at 
> org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:58)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:694)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:507)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$4$$anon$4.run(FsHistoryProvider.scala:399)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23139) Read eventLog file with mixed encodings

2018-02-07 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356507#comment-16356507
 ] 

Jiang Xingbo commented on SPARK-23139:
--

```
EventLog may contain mixed encodings such as custom exception message
```

Could you please elaborate on how this happened?

> Read eventLog file with mixed encodings
> ---
>
> Key: SPARK-23139
> URL: https://issues.apache.org/jira/browse/SPARK-23139
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: DENG FEI
>Priority: Major
>
> EventLog may contain mixed encodings such as custom exception message, but 
> caused to replay failure.
> java.nio.charset.MalformedInputException: Input length = 1
> at java.nio.charset.CoderResult.throwException(CoderResult.java:281)
>  at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339)
>  at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
>  at java.io.InputStreamReader.read(InputStreamReader.java:184)
>  at java.io.BufferedReader.fill(BufferedReader.java:161)
>  at java.io.BufferedReader.readLine(BufferedReader.java:324)
>  at java.io.BufferedReader.readLine(BufferedReader.java:389)
>  at 
> scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72)
>  at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:836)
>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
>  at 
> org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:78)
>  at 
> org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:58)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:694)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:507)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$4$$anon$4.run(FsHistoryProvider.scala:399)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23330) Spark UI SQL executions page throws NPE

2018-02-03 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-23330:


 Summary: Spark UI SQL executions page throws NPE
 Key: SPARK-23330
 URL: https://issues.apache.org/jira/browse/SPARK-23330
 Project: Spark
  Issue Type: Bug
  Components: Web UI
Affects Versions: 2.3.0
Reporter: Jiang Xingbo


Spark UI SQL executions page throws the following error and the page crashes:
```
HTTP ERROR 500
Problem accessing /SQL/. Reason:

Server Error
Caused by:
java.lang.NullPointerException
at 
scala.collection.immutable.StringOps$.length$extension(StringOps.scala:47)
at scala.collection.immutable.StringOps.length(StringOps.scala:47)
at 
scala.collection.IndexedSeqOptimized$class.isEmpty(IndexedSeqOptimized.scala:27)
at scala.collection.immutable.StringOps.isEmpty(StringOps.scala:29)
at 
scala.collection.TraversableOnce$class.nonEmpty(TraversableOnce.scala:111)
at scala.collection.immutable.StringOps.nonEmpty(StringOps.scala:29)
at 
org.apache.spark.sql.execution.ui.ExecutionTable.descriptionCell(AllExecutionsPage.scala:182)
at 
org.apache.spark.sql.execution.ui.ExecutionTable.row(AllExecutionsPage.scala:155)
at 
org.apache.spark.sql.execution.ui.ExecutionTable$$anonfun$8.apply(AllExecutionsPage.scala:204)
at 
org.apache.spark.sql.execution.ui.ExecutionTable$$anonfun$8.apply(AllExecutionsPage.scala:204)
at 
org.apache.spark.ui.UIUtils$$anonfun$listingTable$2.apply(UIUtils.scala:339)
at 
org.apache.spark.ui.UIUtils$$anonfun$listingTable$2.apply(UIUtils.scala:339)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.ui.UIUtils$.listingTable(UIUtils.scala:339)
at 
org.apache.spark.sql.execution.ui.ExecutionTable.toNodeSeq(AllExecutionsPage.scala:203)
at 
org.apache.spark.sql.execution.ui.AllExecutionsPage.render(AllExecutionsPage.scala:67)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at 
org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:584)
at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
at 
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at 
org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.eclipse.jetty.server.Server.handle(Server.java:534)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
at 
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
at 
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:108)
at 
org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
at 
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at 
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at 
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
at java.lang.Thread.run(Thread.java:748)
```
Seems the bug is imported by 
https://github.com/apache/spark/pull/19681/files#diff-a74d84702d8d47d5269e96740a55a3caR63



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Created] (SPARK-23243) Shuffle+Repartition on an RDD could lead to incorrect answers

2018-01-26 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-23243:


 Summary: Shuffle+Repartition on an RDD could lead to incorrect 
answers
 Key: SPARK-23243
 URL: https://issues.apache.org/jira/browse/SPARK-23243
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0, 2.1.0, 2.0.0, 1.6.0, 2.3.0
Reporter: Jiang Xingbo


The RDD repartition also uses the round-robin way to distribute data, this can 
also cause incorrect answers on RDD workload the similar way as in 
https://issues.apache.org/jira/browse/SPARK-23207

The approach that fixes DataFrame.repartition() doesn't apply on the RDD 
repartition issue, as discussed in 
https://github.com/apache/spark/pull/20393#issuecomment-360912451

We track for alternative solutions for this issue in this task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23207) Shuffle+Repartition on an DataFrame could lead to incorrect answers

2018-01-26 Thread Jiang Xingbo (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xingbo updated SPARK-23207:
-
Summary: Shuffle+Repartition on an DataFrame could lead to incorrect 
answers  (was: Shuffle+Repartition on an RDD/DataFrame could lead to Data Loss)

> Shuffle+Repartition on an DataFrame could lead to incorrect answers
> ---
>
> Key: SPARK-23207
> URL: https://issues.apache.org/jira/browse/SPARK-23207
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Jiang Xingbo
>Assignee: Jiang Xingbo
>Priority: Blocker
>
> Currently shuffle repartition uses RoundRobinPartitioning, the generated 
> result is nondeterministic since the sequence of input rows are not 
> determined.
> The bug can be triggered when there is a repartition call following a shuffle 
> (which would lead to non-deterministic row ordering), as the pattern shows 
> below:
> upstream stage -> repartition stage -> result stage
> (-> indicate a shuffle)
> When one of the executors process goes down, some tasks on the repartition 
> stage will be retried and generate inconsistent ordering, and some tasks of 
> the result stage will be retried generating different data.
> The following code returns 931532, instead of 100:
> {code}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
>   x
> }.repartition(200).map { x =>
>   if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
> throw new Exception("pkill -f java".!!)
>   }
>   x
> }
> res.distinct().count()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23207) Shuffle+Repartition on an RDD/DataFrame could lead to Data Loss

2018-01-24 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338419#comment-16338419
 ] 

Jiang Xingbo commented on SPARK-23207:
--

I'm working on this.

> Shuffle+Repartition on an RDD/DataFrame could lead to Data Loss
> ---
>
> Key: SPARK-23207
> URL: https://issues.apache.org/jira/browse/SPARK-23207
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Jiang Xingbo
>Priority: Major
>
> Currently shuffle repartition uses RoundRobinPartitioning, the generated 
> result is nondeterministic since the sequence of input rows are not 
> determined.
> The bug can be triggered when there is a repartition call following a shuffle 
> (which would lead to non-deterministic row ordering), as the pattern shows 
> below:
> upstream stage -> repartition stage -> result stage
> (-> indicate a shuffle)
> When one of the executors process goes down, some tasks on the repartition 
> stage will be retried and generate inconsistent ordering, and some tasks of 
> the result stage will be retried generating different data.
> The following code returns 931532, instead of 100:
> {code}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
>   x
> }.repartition(200).map { x =>
>   if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
> throw new Exception("pkill -f java".!!)
>   }
>   x
> }
> res.distinct().count()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23207) Shuffle+Repartition on an RDD/DataFrame could lead to Data Loss

2018-01-24 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-23207:


 Summary: Shuffle+Repartition on an RDD/DataFrame could lead to 
Data Loss
 Key: SPARK-23207
 URL: https://issues.apache.org/jira/browse/SPARK-23207
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
Reporter: Jiang Xingbo


Currently shuffle repartition uses RoundRobinPartitioning, the generated result 
is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle 
(which would lead to non-deterministic row ordering), as the pattern shows 
below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition 
stage will be retried and generate inconsistent ordering, and some tasks of the 
result stage will be retried generating different data.

The following code returns 931532, instead of 100:
{code}
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23188) Make vectorized columar reader batch size configurable

2018-01-23 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-23188:


 Summary: Make vectorized columar reader batch size configurable
 Key: SPARK-23188
 URL: https://issues.apache.org/jira/browse/SPARK-23188
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Jiang Xingbo






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22360) Add unit test for Window Specifications

2018-01-19 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332773#comment-16332773
 ] 

Jiang Xingbo commented on SPARK-22360:
--

Created https://issues.apache.org/jira/browse/SPARK-23160

> Add unit test for Window Specifications
> ---
>
> Key: SPARK-22360
> URL: https://issues.apache.org/jira/browse/SPARK-22360
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Jiang Xingbo
>Priority: Major
>
> * different partition clauses (none, one, multiple)
> * different order clauses (none, one, multiple, asc/desc, nulls first/last)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23160) Add more window sql tests

2018-01-19 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-23160:


 Summary: Add more window sql tests
 Key: SPARK-23160
 URL: https://issues.apache.org/jira/browse/SPARK-23160
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.3.0
Reporter: Jiang Xingbo


We should also cover the window sql interface, example in 
`sql/core/src/test/resources/sql-tests/inputs/window.sql`, it should also be 
funny to see whether we can generate consistent results for window tests in 
other major databases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22360) Add unit test for Window Specifications

2018-01-19 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332762#comment-16332762
 ] 

Jiang Xingbo commented on SPARK-22360:
--

Sorry for late response. It's great that we can cover the DataFrame test cases, 
I really think we should have them soon. Besides, we should also cover the 
window sql interface, example in 
`sql/core/src/test/resources/sql-tests/inputs/window.sql`, it should also be 
funny to see whether we can generate consistent results for window tests in 
other major databases.

[~smilegator] WDYT?

> Add unit test for Window Specifications
> ---
>
> Key: SPARK-22360
> URL: https://issues.apache.org/jira/browse/SPARK-22360
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Jiang Xingbo
>Priority: Major
>
> * different partition clauses (none, one, multiple)
> * different order clauses (none, one, multiple, asc/desc, nulls first/last)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22297) Flaky test: BlockManagerSuite "Shuffle registration timeout and maxAttempts conf"

2018-01-01 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307501#comment-16307501
 ] 

Jiang Xingbo commented on SPARK-22297:
--

How often do we run into this? Personally I can't repro this test failure on my 
local environment.

> Flaky test: BlockManagerSuite "Shuffle registration timeout and maxAttempts 
> conf"
> -
>
> Key: SPARK-22297
> URL: https://issues.apache.org/jira/browse/SPARK-22297
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Tests
>Affects Versions: 2.3.0
>Reporter: Marcelo Vanzin
>Priority: Minor
>
> Ran into this locally; the test code seems to use timeouts which generally 
> end up in flakiness like this.
> {noformat}
> [info] - SPARK-20640: Shuffle registration timeout and maxAttempts conf are 
> working *** FAILED *** (1 second, 203 milliseconds)
> [info]   "Unable to register with external shuffle server due to : 
> java.util.concurrent.TimeoutException: Timeout waiting for task." did not 
> contain "test_spark_20640_try_again" (BlockManagerSuite.scala:1370)
> [info]   org.scalatest.exceptions.TestFailedException:
> [info]   at 
> org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
> [info]   at 
> org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
> [info]   at 
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)
> [info]   at 
> org.apache.spark.storage.BlockManagerSuite$$anonfun$14.apply$mcV$sp(BlockManagerSuite.scala:1370)
> [info]   at 
> org.apache.spark.storage.BlockManagerSuite$$anonfun$14.apply(BlockManagerSuite.scala:1323)
> [info]   at 
> org.apache.spark.storage.BlockManagerSuite$$anonfun$14.apply(BlockManagerSuite.scala:1323)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22359) Improve the test coverage of window functions

2017-12-13 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289363#comment-16289363
 ] 

Jiang Xingbo commented on SPARK-22359:
--

[~smurakozi] Please feel free to PR for this.

> Improve the test coverage of window functions
> -
>
> Key: SPARK-22359
> URL: https://issues.apache.org/jira/browse/SPARK-22359
> Project: Spark
>  Issue Type: Test
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Jiang Xingbo
>
> There are already quite a few integration tests using window functions, but 
> the unit tests coverage for window funtions is not ideal.
> We'd like to test the following aspects:
> * Specifications
> ** different partition clauses (none, one, multiple)
> ** different order clauses (none, one, multiple, asc/desc, nulls first/last)
> * Frames and their combinations
> ** OffsetWindowFunctionFrame
> ** UnboundedWindowFunctionFrame
> ** SlidingWindowFunctionFrame
> ** UnboundedPrecedingWindowFunctionFrame
> ** UnboundedFollowingWindowFunctionFrame
> * Aggregate function types
> ** Declarative
> ** Imperative
> ** UDAF
> * Spilling
> ** Cover the conditions that WindowExec should spill at least once 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22757) Init-container in the driver/executor pods for downloading remote dependencies

2017-12-13 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16288856#comment-16288856
 ] 

Jiang Xingbo commented on SPARK-22757:
--

Is this also targeted to 2.3 release?

> Init-container in the driver/executor pods for downloading remote dependencies
> --
>
> Key: SPARK-22757
> URL: https://issues.apache.org/jira/browse/SPARK-22757
> Project: Spark
>  Issue Type: Sub-task
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Yinan Li
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22680) SparkSQL scan all partitions when the specified partitions are not exists in parquet formatted table

2017-12-06 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281235#comment-16281235
 ] 

Jiang Xingbo commented on SPARK-22680:
--

Could you also post the result of EXPLAIN? Thanks!

> SparkSQL scan all partitions when the specified partitions are not exists in 
> parquet formatted table
> 
>
> Key: SPARK-22680
> URL: https://issues.apache.org/jira/browse/SPARK-22680
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.2.0
> Environment: spark2.0.2 spark2.2.0
>Reporter: Xiaochen Ouyang
>
> 1. spark-sql --master local[2]
> 2. create external table test (id int,name string) partitioned by (country 
> string,province string, day string,hour int) stored as parquet localtion 
> '/warehouse/test';
> 3.produce data into table test
> 4. select count(1) from test where country = '185' and province = '021' and 
> day = '2017-11-12' and hour = 10; if the 4 filter conditions are not exists 
> in HDFS and MetaStore[mysql] , this sql will scan all partitions in table test



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22363) Add unit test for Window spilling

2017-10-26 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-22363:


 Summary: Add unit test for Window spilling
 Key: SPARK-22363
 URL: https://issues.apache.org/jira/browse/SPARK-22363
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.3.0
Reporter: Jiang Xingbo


Cover the senarios that WindowExec should spills for at least once.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22362) Add unit test for Window Aggregate Functions

2017-10-26 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-22362:


 Summary: Add unit test for Window Aggregate Functions
 Key: SPARK-22362
 URL: https://issues.apache.org/jira/browse/SPARK-22362
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.3.0
Reporter: Jiang Xingbo


* Declarative
* Imperative
* UDAF



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22361) Add unit test for Window Frames

2017-10-26 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-22361:


 Summary: Add unit test for Window Frames
 Key: SPARK-22361
 URL: https://issues.apache.org/jira/browse/SPARK-22361
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.3.0
Reporter: Jiang Xingbo


* OffsetWindowFunctionFrame
* UnboundedWindowFunctionFrame
* SlidingWindowFunctionFrame
* UnboundedPrecedingWindowFunctionFrame
* UnboundedFollowingWindowFunctionFrame



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22360) Add unit test for Window Specifications

2017-10-26 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-22360:


 Summary: Add unit test for Window Specifications
 Key: SPARK-22360
 URL: https://issues.apache.org/jira/browse/SPARK-22360
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.3.0
Reporter: Jiang Xingbo


* different partition clauses (none, one, multiple)
* different order clauses (none, one, multiple, asc/desc, nulls first/last)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22359) Improve the test coverage of window functions

2017-10-26 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-22359:


 Summary: Improve the test coverage of window functions
 Key: SPARK-22359
 URL: https://issues.apache.org/jira/browse/SPARK-22359
 Project: Spark
  Issue Type: Test
  Components: SQL
Affects Versions: 2.3.0
Reporter: Jiang Xingbo


There are already quite a few integration tests using window functions, but the 
unit tests coverage for window funtions is not ideal.
We'd like to test the following aspects:
* Specifications
** different partition clauses (none, one, multiple)
** different order clauses (none, one, multiple, asc/desc, nulls first/last)
* Frames and their combinations
** OffsetWindowFunctionFrame
** UnboundedWindowFunctionFrame
** SlidingWindowFunctionFrame
** UnboundedPrecedingWindowFunctionFrame
** UnboundedFollowingWindowFunctionFrame
* Aggregate function types
** Declarative
** Imperative
** UDAF
* Spilling
** Cover the conditions that WindowExec should spill at least once 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22214) Refactor the list hive partitions code

2017-10-06 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-22214:


 Summary: Refactor the list hive partitions code
 Key: SPARK-22214
 URL: https://issues.apache.org/jira/browse/SPARK-22214
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.2.0
Reporter: Jiang Xingbo


Refactor the code for list hive partitions, to make it more extensible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21608) Window rangeBetween() API should allow literal boundary

2017-08-02 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-21608:


 Summary: Window rangeBetween() API should allow literal boundary
 Key: SPARK-21608
 URL: https://issues.apache.org/jira/browse/SPARK-21608
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.2.0
Reporter: Jiang Xingbo


Window rangeBetween() API should allow literal boundary, that means, the window 
range frame can calculate frame of double/date/timestamp.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21496) Support codegen for TakeOrderedAndProjectExec

2017-07-20 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-21496:


 Summary: Support codegen for TakeOrderedAndProjectExec
 Key: SPARK-21496
 URL: https://issues.apache.org/jira/browse/SPARK-21496
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.2.0
Reporter: Jiang Xingbo
Priority: Minor


The operator `SortExec` supports codegen, but `TakeOrderedAndProjectExec` 
doesn't. Perhaps we should also add codegen support for 
`TakeOrderedAndProjectExec`, but we should also do benchmark for it carefully.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21410) In RangePartitioner(partitions: Int, rdd: RDD[]), RangePartitioner.numPartitions is wrong if the number of elements in RDD (rdd.count()) is less than number of partiti

2017-07-16 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16088970#comment-16088970
 ] 

Jiang Xingbo commented on SPARK-21410:
--

This is not a bug since it doesn't generate any wrong result, but it may be a 
improvement we'd better to have.

> In RangePartitioner(partitions: Int, rdd: RDD[]), 
> RangePartitioner.numPartitions is wrong if the number of elements in RDD 
> (rdd.count()) is less than number of partitions (partitions in constructor).
> ---
>
> Key: SPARK-21410
> URL: https://issues.apache.org/jira/browse/SPARK-21410
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.0.0, 2.1.0, 2.2.0
>Reporter: APeng Zhang
>Priority: Minor
>
> In RangePartitioner(partitions: Int, rdd: RDD[]), 
> RangePartitioner.numPartitions is wrong if the number of elements in RDD 
> (rdd.count()) is less than number of partitions (partitions in constructor).
> Code1 to reproduce:
> {code:java}
> import spark.implicits._
> val ds = spark.createDataset(Seq((1, 1)))
> println(ds.sort("_1").rdd.getNumPartitions)
> // The output of println is 2
> {code}
> Code2 to reproduce:
> {code:java}
>   test("Number of elements in RDD is less than number of partitions") {
> val rdd = sc.parallelize(1 to 3).map(x => (x, x))
> val partitioner = new RangePartitioner(22, rdd)
> assert(partitioner.numPartitions === 3)
>   }
> {code}
>   This test will be failed because partitioner.numPartitions is 4.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21410) In RangePartitioner(partitions: Int, rdd: RDD[]), RangePartitioner.numPartitions is wrong if the number of elements in RDD (rdd.count()) is less than number of partition

2017-07-16 Thread Jiang Xingbo (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xingbo updated SPARK-21410:
-
Issue Type: Improvement  (was: Bug)

> In RangePartitioner(partitions: Int, rdd: RDD[]), 
> RangePartitioner.numPartitions is wrong if the number of elements in RDD 
> (rdd.count()) is less than number of partitions (partitions in constructor).
> ---
>
> Key: SPARK-21410
> URL: https://issues.apache.org/jira/browse/SPARK-21410
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.0.0, 2.1.0, 2.2.0
>Reporter: APeng Zhang
>Priority: Minor
>
> In RangePartitioner(partitions: Int, rdd: RDD[]), 
> RangePartitioner.numPartitions is wrong if the number of elements in RDD 
> (rdd.count()) is less than number of partitions (partitions in constructor).
> Code1 to reproduce:
> {code:java}
> import spark.implicits._
> val ds = spark.createDataset(Seq((1, 1)))
> println(ds.sort("_1").rdd.getNumPartitions)
> // The output of println is 2
> {code}
> Code2 to reproduce:
> {code:java}
>   test("Number of elements in RDD is less than number of partitions") {
> val rdd = sc.parallelize(1 to 3).map(x => (x, x))
> val partitioner = new RangePartitioner(22, rdd)
> assert(partitioner.numPartitions === 3)
>   }
> {code}
>   This test will be failed because partitioner.numPartitions is 4.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14151) Propose to refactor and expose Metrics Sink and Source interface

2017-07-11 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16082283#comment-16082283
 ] 

Jiang Xingbo commented on SPARK-14151:
--

Since this is purposed to add a set of public API, it would make more sense to 
start a SPIP for the topic. [~jerryshao] Would you like to do that?

> Propose to refactor and expose Metrics Sink and Source interface
> 
>
> Key: SPARK-14151
> URL: https://issues.apache.org/jira/browse/SPARK-14151
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Saisai Shao
>Priority: Minor
>
> MetricsSystem is designed for plug-in different sources and sinks, user could 
> write their own sources and sinks and configured through metrics.properties, 
> MetricsSystem will register it through reflection. But current Source and 
> Sink interface is private, which means user cannot create their own sources 
> and sinks unless using the same package.
> So here propose to expose source and sink interface, this will let user build 
> and maintain their own source and sink, alleviate the maintenance overhead of 
> spark codebase. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21349) Make TASK_SIZE_TO_WARN_KB configurable

2017-07-10 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081518#comment-16081518
 ] 

Jiang Xingbo commented on SPARK-21349:
--

[~dongjoon] Are you running the test for Spark SQL? Or running some 
user-defined RDD directly? This information should help us narrowing down the 
scope of the problem. Thanks!

> Make TASK_SIZE_TO_WARN_KB configurable
> --
>
> Key: SPARK-21349
> URL: https://issues.apache.org/jira/browse/SPARK-21349
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.6.3, 2.2.0
>Reporter: Dongjoon Hyun
>Priority: Minor
>
> Since Spark 1.1.0, Spark emits warning when task size exceeds a threshold, 
> SPARK-2185. Although this is just a warning message, this issue tries to make 
> `TASK_SIZE_TO_WARN_KB` into a normal Spark configuration for advanced users.
> According to the Jenkins log, we also have 123 warnings even in our unit test.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21366) Add sql test for window functions

2017-07-10 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-21366:


 Summary: Add sql test for window functions
 Key: SPARK-21366
 URL: https://issues.apache.org/jira/browse/SPARK-21366
 Project: Spark
  Issue Type: Task
  Components: SQL
Affects Versions: 2.3.0
Reporter: Jiang Xingbo
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19451) rangeBetween method should accept Long value as boundary

2017-07-05 Thread Jiang Xingbo (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xingbo updated SPARK-19451:
-
Summary: rangeBetween method should accept Long value as boundary  (was: 
Long values in Window function)

> rangeBetween method should accept Long value as boundary
> 
>
> Key: SPARK-19451
> URL: https://issues.apache.org/jira/browse/SPARK-19451
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.1, 2.0.2
>Reporter: Julien Champ
>
> Hi there,
> there seems to be a major limitation in spark window functions and 
> rangeBetween method.
> If I have the following code :
> {code:title=Exemple |borderStyle=solid}
> val tw =  Window.orderBy("date")
>   .partitionBy("id")
>   .rangeBetween( from , 0)
> {code}
> Everything seems ok, while *from* value is not too large... Even if the 
> rangeBetween() method supports Long parameters.
> But If i set *-216000L* value to *from* it does not work !
> It is probably related to this part of code in the between() method, of the 
> WindowSpec class, called by rangeBetween()
> {code:title=between() method|borderStyle=solid}
> val boundaryStart = start match {
>   case 0 => CurrentRow
>   case Long.MinValue => UnboundedPreceding
>   case x if x < 0 => ValuePreceding(-start.toInt)
>   case x if x > 0 => ValueFollowing(start.toInt)
> }
> {code}
> ( look at this *.toInt* )
> Does anybody know it there's a way to solve / patch this behavior ?
> Any help will be appreciated
> Thx



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21260) Remove the unused OutputFakerExec

2017-06-29 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-21260:


 Summary: Remove the unused OutputFakerExec
 Key: SPARK-21260
 URL: https://issues.apache.org/jira/browse/SPARK-21260
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.1.1
Reporter: Jiang Xingbo
Priority: Minor


OutputFakerExec was added long ago and is not used anywhere now so we should 
remove it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21225) decrease the Mem using for variable 'tasks' in function resourceOffers

2017-06-28 Thread Jiang Xingbo (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xingbo updated SPARK-21225:
-
Issue Type: Bug  (was: Improvement)

> decrease the Mem using for variable 'tasks' in function resourceOffers
> --
>
> Key: SPARK-21225
> URL: https://issues.apache.org/jira/browse/SPARK-21225
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.1.1
>Reporter: yangZhiguo
>Priority: Minor
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> In the function 'resourceOffers', It declare a variable 'tasks' for 
> storage the tasks which have  allocated a executor. It declared like this:
> *{color:#d04437}val tasks = shuffledOffers.map(o => new 
> ArrayBuffer[TaskDescription](o.cores)){color}*
> But, I think this code only conside a situation for that one task per core. 
> If the user config the "spark.task.cpus" as 2 or 3, It really don't need so 
> much space. I think It can motify as follow:
> {color:#14892c}*val tasks = shuffledOffers.map(o => new 
> ArrayBuffer[TaskDescription](Math.ceil(o.cores*1.0/CPUS_PER_TASK).toInt))*{color}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18294) Implement commit protocol to support `mapred` package's committer

2017-06-12 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16047229#comment-16047229
 ] 

Jiang Xingbo commented on SPARK-18294:
--

This is actually legacy code refactoring, it shouldn't affect common user case 
because the old code is still valid. Could you expand on why you need this?

> Implement commit protocol to support `mapred` package's committer
> -
>
> Key: SPARK-18294
> URL: https://issues.apache.org/jira/browse/SPARK-18294
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Jiang Xingbo
>
> Current `FileCommitProtocol` is based on `mapreduce` package, we should 
> implement a `HadoopMapRedCommitProtocol` that supports the older mapred 
> package's commiter.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20989) Fail to start multiple workers on one host if external shuffle service is enabled in standalone mode

2017-06-05 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-20989:


 Summary: Fail to start multiple workers on one host if external 
shuffle service is enabled in standalone mode
 Key: SPARK-20989
 URL: https://issues.apache.org/jira/browse/SPARK-20989
 Project: Spark
  Issue Type: Bug
  Components: Deploy, Spark Core
Affects Versions: 2.1.1
Reporter: Jiang Xingbo


In standalone mode, if we enable external shuffle service by setting 
`spark.shuffle.service.enabled` to true, and then we try to start multiple 
workers on one host(by setting `SPARK_WORKER_INSTANCES=3` in spark-env.sh, and 
then run `sbin/start-slaves.sh`), we can only launch one worker on each host 
successfully and the rest of the workers fail to launch.

The reason is the port of external shuffle service if configed by 
`spark.shuffle.service.port`, so currently we could start no more than one 
external shuffle service on each host. In our case, each worker tries to start 
a external shuffle service, and only one of them successed doing this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20832) Standalone master should explicitly inform drivers of worker deaths and invalidate external shuffle service outputs

2017-05-30 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16029778#comment-16029778
 ] 

Jiang Xingbo commented on SPARK-20832:
--

I'm working on this.

> Standalone master should explicitly inform drivers of worker deaths and 
> invalidate external shuffle service outputs
> ---
>
> Key: SPARK-20832
> URL: https://issues.apache.org/jira/browse/SPARK-20832
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy, Scheduler
>Affects Versions: 2.0.0
>Reporter: Josh Rosen
>
> In SPARK-17370 (a patch authored by [~ekhliang] and reviewed by me), we added 
> logic to the DAGScheduler to mark external shuffle service instances as 
> unavailable upon task failure when the task failure reason was "SlaveLost" 
> and this was known to be caused by worker death. If the Spark Master 
> discovered that a worker was dead then it would notify any drivers with 
> executors on those workers to mark those executors as dead. The linked patch 
> simply piggybacked on this logic to have the executor death notification also 
> imply worker death and to have worker-death-caused-executor-death imply 
> shuffle file loss.
> However, there are modes of external shuffle service loss which this 
> mechanism does not detect, leaving the system prone race conditions. Consider 
> the following:
> * Spark standalone is configured to run an external shuffle service embedded 
> in the Worker.
> * Application has shuffle outputs and executors on Worker A.
> * Stage depending on outputs of tasks that ran on Worker A starts.
> * All executors on worker A are removed due to dying with exceptions, 
> scaling-down via the dynamic allocation APIs, but _not_ due to worker death. 
> Worker A is still healthy at this point.
> * At this point the MapOutputTracker still records map output locations on 
> Worker A's shuffle service. This is expected behavior. 
> * Worker A dies at an instant where the application has no executors running 
> on it.
> * The Master knows that Worker A died but does not inform the driver (which 
> had no executors on that worker at the time of its death).
> * Some task from the running stage attempts to fetch map outputs from Worker 
> A but these requests time out because Worker A's shuffle service isn't 
> available.
> * Due to other logic in the scheduler, these preventable FetchFailures don't 
> wind up invaliding the now-invalid unavailable map output locations (this is 
> a distinct bug / behavior which I'll discuss in a separate JIRA ticket).
> * This behavior leads to several unsuccessful stage reattempts and ultimately 
> to a job failure.
> A simple way to address this would be to have the Master explicitly notify 
> drivers of all Worker deaths, even if those drivers don't currently have 
> executors. The Spark Standalone scheduler backend can receive the explicit 
> WorkerLost message and can bubble up the right calls to the task scheduler 
> and DAGScheduler to invalidate map output locations from the now-dead 
> external shuffle service.
> This relates to SPARK-20115 in the sense that both tickets aim to address 
> issues where the external shuffle service is unavailable. The key difference 
> is the mechanism for detection: SPARK-20115 marks the external shuffle 
> service as unavailable whenever any fetch failure occurs from it, whereas the 
> proposal here relies on more explicit signals. This JIRA ticket's proposal is 
> scoped only to Spark Standalone mode. As a compromise, we might be able to 
> consider "all of a single shuffle's outputs lost on a single external shuffle 
> service" following a fetch failure (to be discussed in separate JIRA). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-20700) InferFiltersFromConstraints stackoverflows for query (v2)

2017-05-16 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16013199#comment-16013199
 ] 

Jiang Xingbo edited comment on SPARK-20700 at 5/16/17 10:23 PM:


In the previous approach we used `aliasMap` to link an `Attribute` to the 
expression with potentially the form `f(a, b)`, but we only searched the 
`expressions` and `children.expressions` for this, which is not enough when an 
`Alias` may lies deep in the logical plan. In that case, we can't generate the 
valid equivalent constraint classes and thus we fail at preventing the 
recursive deductions.

I'll send a PR to fix this later today.


was (Author: jiangxb1987):
In the previous approach we used `aliasMap` to link an `Attribute` to the 
expression with potentially the form `f(a, b)`, but we only searched the 
`expressions` and `children.expressions` for this, which is not enough when an 
`Alias` may lies deep in the logical plan. In that case, we can't generate the 
valid equivalent constraint classes and thus we fail to prevent the recursive 
deductions.

I'll send a PR to fix this later today.

> InferFiltersFromConstraints stackoverflows for query (v2)
> -
>
> Key: SPARK-20700
> URL: https://issues.apache.org/jira/browse/SPARK-20700
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.2.0
>Reporter: Josh Rosen
>Assignee: Jiang Xingbo
>
> The following (complicated) query eventually fails with a stack overflow 
> during optimization:
> {code}
> CREATE TEMPORARY VIEW table_5(varchar0002_col_1, smallint_col_2, float_col_3, 
> int_col_4, string_col_5, timestamp_col_6, string_col_7) AS VALUES
>   ('68', CAST(NULL AS SMALLINT), CAST(244.90413 AS FLOAT), -137, '571', 
> TIMESTAMP('2015-01-14 00:00:00.0'), '947'),
>   ('82', CAST(213 AS SMALLINT), CAST(53.184647 AS FLOAT), -724, '-278', 
> TIMESTAMP('1999-08-15 00:00:00.0'), '437'),
>   ('-7', CAST(-15 AS SMALLINT), CAST(NULL AS FLOAT), -890, '778', 
> TIMESTAMP('1991-05-23 00:00:00.0'), '630'),
>   ('22', CAST(676 AS SMALLINT), CAST(385.27386 AS FLOAT), CAST(NULL AS INT), 
> '-10', TIMESTAMP('1996-09-29 00:00:00.0'), '641'),
>   ('16', CAST(430 AS SMALLINT), CAST(187.23717 AS FLOAT), 989, CAST(NULL AS 
> STRING), TIMESTAMP('2024-04-21 00:00:00.0'), '-234'),
>   ('83', CAST(760 AS SMALLINT), CAST(-695.45386 AS FLOAT), -970, '330', 
> CAST(NULL AS TIMESTAMP), '-740'),
>   ('68', CAST(-930 AS SMALLINT), CAST(NULL AS FLOAT), -915, '-766', CAST(NULL 
> AS TIMESTAMP), CAST(NULL AS STRING)),
>   ('48', CAST(692 AS SMALLINT), CAST(-220.59615 AS FLOAT), 940, '-514', 
> CAST(NULL AS TIMESTAMP), '181'),
>   ('21', CAST(44 AS SMALLINT), CAST(NULL AS FLOAT), -175, '761', 
> TIMESTAMP('2016-06-30 00:00:00.0'), '487'),
>   ('50', CAST(953 AS SMALLINT), CAST(837.2948 AS FLOAT), 705, CAST(NULL AS 
> STRING), CAST(NULL AS TIMESTAMP), '-62');
> CREATE VIEW bools(a, b) as values (1, true), (1, true), (1, null);
> SELECT
> AVG(-13) OVER (ORDER BY COUNT(t1.smallint_col_2) DESC ROWS 27 PRECEDING ) AS 
> float_col,
> COUNT(t1.smallint_col_2) AS int_col
> FROM table_5 t1
> INNER JOIN (
> SELECT
> (MIN(-83) OVER (PARTITION BY t2.a ORDER BY t2.a, (t1.int_col_4) * 
> (t1.int_col_4) ROWS BETWEEN CURRENT ROW AND 15 FOLLOWING)) NOT IN (-222, 928) 
> AS boolean_col,
> t2.a,
> (t1.int_col_4) * (t1.int_col_4) AS int_col
> FROM table_5 t1
> LEFT JOIN bools t2 ON (t2.a) = (t1.int_col_4)
> WHERE
> (t1.smallint_col_2) > (t1.smallint_col_2)
> GROUP BY
> t2.a,
> (t1.int_col_4) * (t1.int_col_4)
> HAVING
> ((t1.int_col_4) * (t1.int_col_4)) IN ((t1.int_col_4) * (t1.int_col_4), 
> SUM(t1.int_col_4))
> ) t2 ON (((t2.int_col) = (t1.int_col_4)) AND ((t2.a) = (t1.int_col_4))) AND 
> ((t2.a) = (t1.smallint_col_2));
> {code}
> (I haven't tried to minimize this failing case yet).
> Based on sampled jstacks from the driver, it looks like the query might be 
> repeatedly inferring filters from constraints and then pruning those filters.
> Here's part of the stack at the point where it stackoverflows:
> {code}
> [... repeats ...]
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$.org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative(Canonicalize.scala:50)
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50)
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> 

[jira] [Commented] (SPARK-20700) InferFiltersFromConstraints stackoverflows for query (v2)

2017-05-16 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16013199#comment-16013199
 ] 

Jiang Xingbo commented on SPARK-20700:
--

In the previous approach we used `aliasMap` to link an `Attribute` to the 
expression with potentially the form `f(a, b)`, but we only searched the 
`expressions` and `children.expressions` for this, which is not enough when an 
`Alias` may lies deep in the logical plan. In that case, we can't generate the 
valid equivalent constraint classes and thus we fail to prevent the recursive 
deductions.

I'll send a PR to fix this later today.

> InferFiltersFromConstraints stackoverflows for query (v2)
> -
>
> Key: SPARK-20700
> URL: https://issues.apache.org/jira/browse/SPARK-20700
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.2.0
>Reporter: Josh Rosen
>Assignee: Jiang Xingbo
>
> The following (complicated) query eventually fails with a stack overflow 
> during optimization:
> {code}
> CREATE TEMPORARY VIEW table_5(varchar0002_col_1, smallint_col_2, float_col_3, 
> int_col_4, string_col_5, timestamp_col_6, string_col_7) AS VALUES
>   ('68', CAST(NULL AS SMALLINT), CAST(244.90413 AS FLOAT), -137, '571', 
> TIMESTAMP('2015-01-14 00:00:00.0'), '947'),
>   ('82', CAST(213 AS SMALLINT), CAST(53.184647 AS FLOAT), -724, '-278', 
> TIMESTAMP('1999-08-15 00:00:00.0'), '437'),
>   ('-7', CAST(-15 AS SMALLINT), CAST(NULL AS FLOAT), -890, '778', 
> TIMESTAMP('1991-05-23 00:00:00.0'), '630'),
>   ('22', CAST(676 AS SMALLINT), CAST(385.27386 AS FLOAT), CAST(NULL AS INT), 
> '-10', TIMESTAMP('1996-09-29 00:00:00.0'), '641'),
>   ('16', CAST(430 AS SMALLINT), CAST(187.23717 AS FLOAT), 989, CAST(NULL AS 
> STRING), TIMESTAMP('2024-04-21 00:00:00.0'), '-234'),
>   ('83', CAST(760 AS SMALLINT), CAST(-695.45386 AS FLOAT), -970, '330', 
> CAST(NULL AS TIMESTAMP), '-740'),
>   ('68', CAST(-930 AS SMALLINT), CAST(NULL AS FLOAT), -915, '-766', CAST(NULL 
> AS TIMESTAMP), CAST(NULL AS STRING)),
>   ('48', CAST(692 AS SMALLINT), CAST(-220.59615 AS FLOAT), 940, '-514', 
> CAST(NULL AS TIMESTAMP), '181'),
>   ('21', CAST(44 AS SMALLINT), CAST(NULL AS FLOAT), -175, '761', 
> TIMESTAMP('2016-06-30 00:00:00.0'), '487'),
>   ('50', CAST(953 AS SMALLINT), CAST(837.2948 AS FLOAT), 705, CAST(NULL AS 
> STRING), CAST(NULL AS TIMESTAMP), '-62');
> CREATE VIEW bools(a, b) as values (1, true), (1, true), (1, null);
> SELECT
> AVG(-13) OVER (ORDER BY COUNT(t1.smallint_col_2) DESC ROWS 27 PRECEDING ) AS 
> float_col,
> COUNT(t1.smallint_col_2) AS int_col
> FROM table_5 t1
> INNER JOIN (
> SELECT
> (MIN(-83) OVER (PARTITION BY t2.a ORDER BY t2.a, (t1.int_col_4) * 
> (t1.int_col_4) ROWS BETWEEN CURRENT ROW AND 15 FOLLOWING)) NOT IN (-222, 928) 
> AS boolean_col,
> t2.a,
> (t1.int_col_4) * (t1.int_col_4) AS int_col
> FROM table_5 t1
> LEFT JOIN bools t2 ON (t2.a) = (t1.int_col_4)
> WHERE
> (t1.smallint_col_2) > (t1.smallint_col_2)
> GROUP BY
> t2.a,
> (t1.int_col_4) * (t1.int_col_4)
> HAVING
> ((t1.int_col_4) * (t1.int_col_4)) IN ((t1.int_col_4) * (t1.int_col_4), 
> SUM(t1.int_col_4))
> ) t2 ON (((t2.int_col) = (t1.int_col_4)) AND ((t2.a) = (t1.int_col_4))) AND 
> ((t2.a) = (t1.smallint_col_2));
> {code}
> (I haven't tried to minimize this failing case yet).
> Based on sampled jstacks from the driver, it looks like the query might be 
> repeatedly inferring filters from constraints and then pruning those filters.
> Here's part of the stack at the point where it stackoverflows:
> {code}
> [... repeats ...]
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$.org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative(Canonicalize.scala:50)
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50)
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at scala.collection.immutable.List.flatMap(List.scala:344)
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$.org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative(Canonicalize.scala:50)
> at 
> 

[jira] [Commented] (SPARK-20700) InferFiltersFromConstraints stackoverflows for query (v2)

2017-05-12 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16008286#comment-16008286
 ] 

Jiang Xingbo commented on SPARK-20700:
--

I've reproduced this case, will dive further into it this weekend.

> InferFiltersFromConstraints stackoverflows for query (v2)
> -
>
> Key: SPARK-20700
> URL: https://issues.apache.org/jira/browse/SPARK-20700
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.2.0
>Reporter: Josh Rosen
>
> The following (complicated) query eventually fails with a stack overflow 
> during optimization:
> {code}
> CREATE TEMPORARY VIEW table_5(varchar0002_col_1, smallint_col_2, float_col_3, 
> int_col_4, string_col_5, timestamp_col_6, string_col_7) AS VALUES
>   ('68', CAST(NULL AS SMALLINT), CAST(244.90413 AS FLOAT), -137, '571', 
> TIMESTAMP('2015-01-14 00:00:00.0'), '947'),
>   ('82', CAST(213 AS SMALLINT), CAST(53.184647 AS FLOAT), -724, '-278', 
> TIMESTAMP('1999-08-15 00:00:00.0'), '437'),
>   ('-7', CAST(-15 AS SMALLINT), CAST(NULL AS FLOAT), -890, '778', 
> TIMESTAMP('1991-05-23 00:00:00.0'), '630'),
>   ('22', CAST(676 AS SMALLINT), CAST(385.27386 AS FLOAT), CAST(NULL AS INT), 
> '-10', TIMESTAMP('1996-09-29 00:00:00.0'), '641'),
>   ('16', CAST(430 AS SMALLINT), CAST(187.23717 AS FLOAT), 989, CAST(NULL AS 
> STRING), TIMESTAMP('2024-04-21 00:00:00.0'), '-234'),
>   ('83', CAST(760 AS SMALLINT), CAST(-695.45386 AS FLOAT), -970, '330', 
> CAST(NULL AS TIMESTAMP), '-740'),
>   ('68', CAST(-930 AS SMALLINT), CAST(NULL AS FLOAT), -915, '-766', CAST(NULL 
> AS TIMESTAMP), CAST(NULL AS STRING)),
>   ('48', CAST(692 AS SMALLINT), CAST(-220.59615 AS FLOAT), 940, '-514', 
> CAST(NULL AS TIMESTAMP), '181'),
>   ('21', CAST(44 AS SMALLINT), CAST(NULL AS FLOAT), -175, '761', 
> TIMESTAMP('2016-06-30 00:00:00.0'), '487'),
>   ('50', CAST(953 AS SMALLINT), CAST(837.2948 AS FLOAT), 705, CAST(NULL AS 
> STRING), CAST(NULL AS TIMESTAMP), '-62');
> CREATE VIEW bools(a, b) as values (1, true), (1, true), (1, null);
> SELECT
> AVG(-13) OVER (ORDER BY COUNT(t1.smallint_col_2) DESC ROWS 27 PRECEDING ) AS 
> float_col,
> COUNT(t1.smallint_col_2) AS int_col
> FROM table_5 t1
> INNER JOIN (
> SELECT
> (MIN(-83) OVER (PARTITION BY t2.a ORDER BY t2.a, (t1.int_col_4) * 
> (t1.int_col_4) ROWS BETWEEN CURRENT ROW AND 15 FOLLOWING)) NOT IN (-222, 928) 
> AS boolean_col,
> t2.a,
> (t1.int_col_4) * (t1.int_col_4) AS int_col
> FROM table_5 t1
> LEFT JOIN bools t2 ON (t2.a) = (t1.int_col_4)
> WHERE
> (t1.smallint_col_2) > (t1.smallint_col_2)
> GROUP BY
> t2.a,
> (t1.int_col_4) * (t1.int_col_4)
> HAVING
> ((t1.int_col_4) * (t1.int_col_4)) IN ((t1.int_col_4) * (t1.int_col_4), 
> SUM(t1.int_col_4))
> ) t2 ON (((t2.int_col) = (t1.int_col_4)) AND ((t2.a) = (t1.int_col_4))) AND 
> ((t2.a) = (t1.smallint_col_2));
> {code}
> (I haven't tried to minimize this failing case yet).
> Based on sampled jstacks from the driver, it looks like the query might be 
> repeatedly inferring filters from constraints and then pruning those filters.
> Here's part of the stack at the point where it stackoverflows:
> {code}
> [... repeats ...]
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$.org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative(Canonicalize.scala:50)
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50)
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at scala.collection.immutable.List.flatMap(List.scala:344)
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$.org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative(Canonicalize.scala:50)
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50)
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> 

[jira] [Issue Comment Deleted] (SPARK-20700) InferFiltersFromConstraints stackoverflows for query (v2)

2017-05-12 Thread Jiang Xingbo (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xingbo updated SPARK-20700:
-
Comment: was deleted

(was: I couldn't reproduce the failure on current master branch, the test case 
I use is like the following:
{code}
test("SPARK-20700: InferFiltersFromConstraints stackoverflows for query") {
withTempView("table_5") {
  withView("bools") {
sql(
  """CREATE TEMPORARY VIEW table_5(varchar0002_col_1, smallint_col_2, 
float_col_3, int_col_4, string_col_5, timestamp_col_6, string_col_7) AS VALUES
|  ('68', CAST(NULL AS SMALLINT), CAST(244.90413 AS FLOAT), -137, 
'571', TIMESTAMP('2015-01-14 00:00:00.0'), '947'),
|  ('82', CAST(213 AS SMALLINT), CAST(53.184647 AS FLOAT), -724, 
'-278', TIMESTAMP('1999-08-15 00:00:00.0'), '437'),
|  ('-7', CAST(-15 AS SMALLINT), CAST(NULL AS FLOAT), -890, '778', 
TIMESTAMP('1991-05-23 00:00:00.0'), '630'),
|  ('22', CAST(676 AS SMALLINT), CAST(385.27386 AS FLOAT), 
CAST(NULL AS INT), '-10', TIMESTAMP('1996-09-29 00:00:00.0'), '641'),
|  ('16', CAST(430 AS SMALLINT), CAST(187.23717 AS FLOAT), 989, 
CAST(NULL AS STRING), TIMESTAMP('2024-04-21 00:00:00.0'), '-234'),
|  ('83', CAST(760 AS SMALLINT), CAST(-695.45386 AS FLOAT), -970, 
'330', CAST(NULL AS TIMESTAMP), '-740'),
|  ('68', CAST(-930 AS SMALLINT), CAST(NULL AS FLOAT), -915, 
'-766', CAST(NULL AS TIMESTAMP), CAST(NULL AS STRING)),
|  ('48', CAST(692 AS SMALLINT), CAST(-220.59615 AS FLOAT), 940, 
'-514', CAST(NULL AS TIMESTAMP), '181'),
|  ('21', CAST(44 AS SMALLINT), CAST(NULL AS FLOAT), -175, '761', 
TIMESTAMP('2016-06-30 00:00:00.0'), '487'),
|  ('50', CAST(953 AS SMALLINT), CAST(837.2948 AS FLOAT), 705, 
CAST(NULL AS STRING), CAST(NULL AS TIMESTAMP), '-62')
  """.
stripMargin)
sql("CREATE VIEW bools(a, b) as values (1, true), (1, true), (1, null)")

sql(
  """
  SELECT
|AVG(-13) OVER (ORDER BY COUNT(t1.smallint_col_2) DESC ROWS 27 
PRECEDING ) AS float_col,
|COUNT(t1.smallint_col_2) AS int_col
|FROM table_5 t1
|INNER JOIN (
|SELECT
|(MIN(-83) OVER (PARTITION BY t2.a ORDER BY t2.a, (t1.int_col_4) * 
(t1.int_col_4) ROWS BETWEEN CURRENT ROW AND 15 FOLLOWING)) NOT IN (-222, 928) 
AS boolean_col,
|t2.a,
|(t1.int_col_4) * (t1.int_col_4) AS int_col
|FROM table_5 t1
|LEFT JOIN bools t2 ON (t2.a) = (t1.int_col_4)
|WHERE
|(t1.smallint_col_2) > (t1.smallint_col_2)
|GROUP BY
|t2.a,
|(t1.int_col_4) * (t1.int_col_4)
|HAVING
|((t1.int_col_4) * (t1.int_col_4)) IN ((t1.int_col_4) * 
(t1.int_col_4), SUM(t1.int_col_4))
|) t2 ON (((t2.int_col) = (t1.int_col_4)) AND ((t2.a) = 
(t1.int_col_4))) AND ((t2.a) = (t1.smallint_col_2))
""".stripMargin)
  }
}
  }
{code})

> InferFiltersFromConstraints stackoverflows for query (v2)
> -
>
> Key: SPARK-20700
> URL: https://issues.apache.org/jira/browse/SPARK-20700
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.2.0
>Reporter: Josh Rosen
>
> The following (complicated) query eventually fails with a stack overflow 
> during optimization:
> {code}
> CREATE TEMPORARY VIEW table_5(varchar0002_col_1, smallint_col_2, float_col_3, 
> int_col_4, string_col_5, timestamp_col_6, string_col_7) AS VALUES
>   ('68', CAST(NULL AS SMALLINT), CAST(244.90413 AS FLOAT), -137, '571', 
> TIMESTAMP('2015-01-14 00:00:00.0'), '947'),
>   ('82', CAST(213 AS SMALLINT), CAST(53.184647 AS FLOAT), -724, '-278', 
> TIMESTAMP('1999-08-15 00:00:00.0'), '437'),
>   ('-7', CAST(-15 AS SMALLINT), CAST(NULL AS FLOAT), -890, '778', 
> TIMESTAMP('1991-05-23 00:00:00.0'), '630'),
>   ('22', CAST(676 AS SMALLINT), CAST(385.27386 AS FLOAT), CAST(NULL AS INT), 
> '-10', TIMESTAMP('1996-09-29 00:00:00.0'), '641'),
>   ('16', CAST(430 AS SMALLINT), CAST(187.23717 AS FLOAT), 989, CAST(NULL AS 
> STRING), TIMESTAMP('2024-04-21 00:00:00.0'), '-234'),
>   ('83', CAST(760 AS SMALLINT), CAST(-695.45386 AS FLOAT), -970, '330', 
> CAST(NULL AS TIMESTAMP), '-740'),
>   ('68', CAST(-930 AS SMALLINT), CAST(NULL AS FLOAT), -915, '-766', CAST(NULL 
> AS TIMESTAMP), CAST(NULL AS STRING)),
>   ('48', CAST(692 AS SMALLINT), CAST(-220.59615 AS FLOAT), 940, '-514', 
> CAST(NULL AS TIMESTAMP), '181'),
>   ('21', CAST(44 AS SMALLINT), CAST(NULL AS FLOAT), -175, '761', 
> TIMESTAMP('2016-06-30 00:00:00.0'), '487'),
>   ('50', CAST(953 AS SMALLINT), CAST(837.2948 AS FLOAT), 705, CAST(NULL AS 
> STRING), CAST(NULL AS TIMESTAMP), '-62');
> CREATE VIEW bools(a, b) as 

[jira] [Commented] (SPARK-20700) InferFiltersFromConstraints stackoverflows for query (v2)

2017-05-12 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16007731#comment-16007731
 ] 

Jiang Xingbo commented on SPARK-20700:
--

I couldn't reproduce the failure on current master branch, the test case I use 
is like the following:
{code}
test("SPARK-20700: InferFiltersFromConstraints stackoverflows for query") {
withTempView("table_5") {
  withView("bools") {
sql(
  """CREATE TEMPORARY VIEW table_5(varchar0002_col_1, smallint_col_2, 
float_col_3, int_col_4, string_col_5, timestamp_col_6, string_col_7) AS VALUES
|  ('68', CAST(NULL AS SMALLINT), CAST(244.90413 AS FLOAT), -137, 
'571', TIMESTAMP('2015-01-14 00:00:00.0'), '947'),
|  ('82', CAST(213 AS SMALLINT), CAST(53.184647 AS FLOAT), -724, 
'-278', TIMESTAMP('1999-08-15 00:00:00.0'), '437'),
|  ('-7', CAST(-15 AS SMALLINT), CAST(NULL AS FLOAT), -890, '778', 
TIMESTAMP('1991-05-23 00:00:00.0'), '630'),
|  ('22', CAST(676 AS SMALLINT), CAST(385.27386 AS FLOAT), 
CAST(NULL AS INT), '-10', TIMESTAMP('1996-09-29 00:00:00.0'), '641'),
|  ('16', CAST(430 AS SMALLINT), CAST(187.23717 AS FLOAT), 989, 
CAST(NULL AS STRING), TIMESTAMP('2024-04-21 00:00:00.0'), '-234'),
|  ('83', CAST(760 AS SMALLINT), CAST(-695.45386 AS FLOAT), -970, 
'330', CAST(NULL AS TIMESTAMP), '-740'),
|  ('68', CAST(-930 AS SMALLINT), CAST(NULL AS FLOAT), -915, 
'-766', CAST(NULL AS TIMESTAMP), CAST(NULL AS STRING)),
|  ('48', CAST(692 AS SMALLINT), CAST(-220.59615 AS FLOAT), 940, 
'-514', CAST(NULL AS TIMESTAMP), '181'),
|  ('21', CAST(44 AS SMALLINT), CAST(NULL AS FLOAT), -175, '761', 
TIMESTAMP('2016-06-30 00:00:00.0'), '487'),
|  ('50', CAST(953 AS SMALLINT), CAST(837.2948 AS FLOAT), 705, 
CAST(NULL AS STRING), CAST(NULL AS TIMESTAMP), '-62')
  """.
stripMargin)
sql("CREATE VIEW bools(a, b) as values (1, true), (1, true), (1, null)")

sql(
  """
  SELECT
|AVG(-13) OVER (ORDER BY COUNT(t1.smallint_col_2) DESC ROWS 27 
PRECEDING ) AS float_col,
|COUNT(t1.smallint_col_2) AS int_col
|FROM table_5 t1
|INNER JOIN (
|SELECT
|(MIN(-83) OVER (PARTITION BY t2.a ORDER BY t2.a, (t1.int_col_4) * 
(t1.int_col_4) ROWS BETWEEN CURRENT ROW AND 15 FOLLOWING)) NOT IN (-222, 928) 
AS boolean_col,
|t2.a,
|(t1.int_col_4) * (t1.int_col_4) AS int_col
|FROM table_5 t1
|LEFT JOIN bools t2 ON (t2.a) = (t1.int_col_4)
|WHERE
|(t1.smallint_col_2) > (t1.smallint_col_2)
|GROUP BY
|t2.a,
|(t1.int_col_4) * (t1.int_col_4)
|HAVING
|((t1.int_col_4) * (t1.int_col_4)) IN ((t1.int_col_4) * 
(t1.int_col_4), SUM(t1.int_col_4))
|) t2 ON (((t2.int_col) = (t1.int_col_4)) AND ((t2.a) = 
(t1.int_col_4))) AND ((t2.a) = (t1.smallint_col_2))
""".stripMargin)
  }
}
  }
{code}

> InferFiltersFromConstraints stackoverflows for query (v2)
> -
>
> Key: SPARK-20700
> URL: https://issues.apache.org/jira/browse/SPARK-20700
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.2.0
>Reporter: Josh Rosen
>
> The following (complicated) query eventually fails with a stack overflow 
> during optimization:
> {code}
> CREATE TEMPORARY VIEW table_5(varchar0002_col_1, smallint_col_2, float_col_3, 
> int_col_4, string_col_5, timestamp_col_6, string_col_7) AS VALUES
>   ('68', CAST(NULL AS SMALLINT), CAST(244.90413 AS FLOAT), -137, '571', 
> TIMESTAMP('2015-01-14 00:00:00.0'), '947'),
>   ('82', CAST(213 AS SMALLINT), CAST(53.184647 AS FLOAT), -724, '-278', 
> TIMESTAMP('1999-08-15 00:00:00.0'), '437'),
>   ('-7', CAST(-15 AS SMALLINT), CAST(NULL AS FLOAT), -890, '778', 
> TIMESTAMP('1991-05-23 00:00:00.0'), '630'),
>   ('22', CAST(676 AS SMALLINT), CAST(385.27386 AS FLOAT), CAST(NULL AS INT), 
> '-10', TIMESTAMP('1996-09-29 00:00:00.0'), '641'),
>   ('16', CAST(430 AS SMALLINT), CAST(187.23717 AS FLOAT), 989, CAST(NULL AS 
> STRING), TIMESTAMP('2024-04-21 00:00:00.0'), '-234'),
>   ('83', CAST(760 AS SMALLINT), CAST(-695.45386 AS FLOAT), -970, '330', 
> CAST(NULL AS TIMESTAMP), '-740'),
>   ('68', CAST(-930 AS SMALLINT), CAST(NULL AS FLOAT), -915, '-766', CAST(NULL 
> AS TIMESTAMP), CAST(NULL AS STRING)),
>   ('48', CAST(692 AS SMALLINT), CAST(-220.59615 AS FLOAT), 940, '-514', 
> CAST(NULL AS TIMESTAMP), '181'),
>   ('21', CAST(44 AS SMALLINT), CAST(NULL AS FLOAT), -175, '761', 
> TIMESTAMP('2016-06-30 00:00:00.0'), '487'),
>   ('50', CAST(953 AS SMALLINT), CAST(837.2948 AS FLOAT), 705, CAST(NULL AS 
> STRING), CAST(NULL AS TIMESTAMP), '-62');
> CREATE VIEW 

[jira] [Commented] (SPARK-20700) InferFiltersFromConstraints stackoverflows for query (v2)

2017-05-11 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16006201#comment-16006201
 ] 

Jiang Xingbo commented on SPARK-20700:
--

I'm working on this, thank you![~joshrosen]

> InferFiltersFromConstraints stackoverflows for query (v2)
> -
>
> Key: SPARK-20700
> URL: https://issues.apache.org/jira/browse/SPARK-20700
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.2.0
>Reporter: Josh Rosen
>
> The following (complicated) query eventually fails with a stack overflow 
> during optimization:
> {code}
> CREATE TEMPORARY VIEW table_5(varchar0002_col_1, smallint_col_2, float_col_3, 
> int_col_4, string_col_5, timestamp_col_6, string_col_7) AS VALUES
>   ('68', CAST(NULL AS SMALLINT), CAST(244.90413 AS FLOAT), -137, '571', 
> TIMESTAMP('2015-01-14 00:00:00.0'), '947'),
>   ('82', CAST(213 AS SMALLINT), CAST(53.184647 AS FLOAT), -724, '-278', 
> TIMESTAMP('1999-08-15 00:00:00.0'), '437'),
>   ('-7', CAST(-15 AS SMALLINT), CAST(NULL AS FLOAT), -890, '778', 
> TIMESTAMP('1991-05-23 00:00:00.0'), '630'),
>   ('22', CAST(676 AS SMALLINT), CAST(385.27386 AS FLOAT), CAST(NULL AS INT), 
> '-10', TIMESTAMP('1996-09-29 00:00:00.0'), '641'),
>   ('16', CAST(430 AS SMALLINT), CAST(187.23717 AS FLOAT), 989, CAST(NULL AS 
> STRING), TIMESTAMP('2024-04-21 00:00:00.0'), '-234'),
>   ('83', CAST(760 AS SMALLINT), CAST(-695.45386 AS FLOAT), -970, '330', 
> CAST(NULL AS TIMESTAMP), '-740'),
>   ('68', CAST(-930 AS SMALLINT), CAST(NULL AS FLOAT), -915, '-766', CAST(NULL 
> AS TIMESTAMP), CAST(NULL AS STRING)),
>   ('48', CAST(692 AS SMALLINT), CAST(-220.59615 AS FLOAT), 940, '-514', 
> CAST(NULL AS TIMESTAMP), '181'),
>   ('21', CAST(44 AS SMALLINT), CAST(NULL AS FLOAT), -175, '761', 
> TIMESTAMP('2016-06-30 00:00:00.0'), '487'),
>   ('50', CAST(953 AS SMALLINT), CAST(837.2948 AS FLOAT), 705, CAST(NULL AS 
> STRING), CAST(NULL AS TIMESTAMP), '-62');
> CREATE VIEW bools(a, b) as values (1, true), (1, true), (1, null);
> SELECT
> AVG(-13) OVER (ORDER BY COUNT(t1.smallint_col_2) DESC ROWS 27 PRECEDING ) AS 
> float_col,
> COUNT(t1.smallint_col_2) AS int_col
> FROM table_5 t1
> INNER JOIN (
> SELECT
> (MIN(-83) OVER (PARTITION BY t2.a ORDER BY t2.a, (t1.int_col_4) * 
> (t1.int_col_4) ROWS BETWEEN CURRENT ROW AND 15 FOLLOWING)) NOT IN (-222, 928) 
> AS boolean_col,
> t2.a,
> (t1.int_col_4) * (t1.int_col_4) AS int_col
> FROM table_5 t1
> LEFT JOIN bools t2 ON (t2.a) = (t1.int_col_4)
> WHERE
> (t1.smallint_col_2) > (t1.smallint_col_2)
> GROUP BY
> t2.a,
> (t1.int_col_4) * (t1.int_col_4)
> HAVING
> ((t1.int_col_4) * (t1.int_col_4)) IN ((t1.int_col_4) * (t1.int_col_4), 
> SUM(t1.int_col_4))
> ) t2 ON (((t2.int_col) = (t1.int_col_4)) AND ((t2.a) = (t1.int_col_4))) AND 
> ((t2.a) = (t1.smallint_col_2));
> {code}
> (I haven't tried to minimize this failing case yet).
> Based on sampled jstacks from the driver, it looks like the query might be 
> repeatedly inferring filters from constraints and then pruning those filters.
> Here's part of the stack at the point where it stackoverflows:
> {code}
> [... repeats ...]
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$.org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative(Canonicalize.scala:50)
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50)
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at scala.collection.immutable.List.flatMap(List.scala:344)
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$.org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative(Canonicalize.scala:50)
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50)
> at 
> org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> 

[jira] [Commented] (SPARK-20680) Spark-sql do not support for void column datatype of view

2017-05-11 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16006169#comment-16006169
 ] 

Jiang Xingbo commented on SPARK-20680:
--

[~hvanhovell]Sure, I'll look at this issue.

> Spark-sql do not support for void column datatype of view
> -
>
> Key: SPARK-20680
> URL: https://issues.apache.org/jira/browse/SPARK-20680
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Lantao Jin
>
> Create a HIVE view:
> {quote}
> hive> create table bad as select 1 x, null z from dual;
> {quote}
> Because there's no type, Hive gives it the VOID type:
> {quote}
> hive> describe bad;
> OK
> x int 
> z void
> {quote}
> In Spark2.0.x, the behaviour to read this view is normal:
> {quote}
> spark-sql> describe bad;
> x   int NULL
> z   voidNULL
> Time taken: 4.431 seconds, Fetched 2 row(s)
> {quote}
> But in Spark2.1.x, it failed with SparkException: Cannot recognize hive type 
> string: void
> {quote}
> spark-sql> describe bad;
> 17/05/09 03:12:08 INFO execution.SparkSqlParser: Parsing command: describe bad
> 17/05/09 03:12:08 INFO parser.CatalystSqlParser: Parsing command: int
> 17/05/09 03:12:08 INFO parser.CatalystSqlParser: Parsing command: void
> 17/05/09 03:12:08 ERROR thriftserver.SparkSQLDriver: Failed in [describe bad]
> org.apache.spark.SparkException: Cannot recognize hive type string: void
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl.org$apache$spark$sql$hive$client$HiveClientImpl$$fromHiveColumn(HiveClientImpl.scala:789)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:365)
>   
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:365)
>   
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11.apply(HiveClientImpl.scala:365)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11.apply(HiveClientImpl.scala:361)
> Caused by: org.apache.spark.sql.catalyst.parser.ParseException:
> DataType void() is not supported.(line 1, pos 0)
> == SQL ==  
> void   
> ^^^
> ... 61 more
> org.apache.spark.SparkException: Cannot recognize hive type string: void
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20236) Overwrite a partitioned table should only overwrite related partitions

2017-04-06 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958448#comment-15958448
 ] 

Jiang Xingbo commented on SPARK-20236:
--

I‘m working on this.

> Overwrite a partitioned table should only overwrite related partitions
> --
>
> Key: SPARK-20236
> URL: https://issues.apache.org/jira/browse/SPARK-20236
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Wenchen Fan
>
> When we overwrite a partitioned table, currently Spark will truncate the 
> entire table to write new data, or truncate a bunch of partitions according 
> to the given static partitions.
> For example, {{INSERT OVERWRITE tbl ...}} will truncate the entire table, 
> {{INSERT OVERWRITE tbl PARTITION (a=1, b)}} will truncate all the partitions 
> that starts with {{a=1}}.
> This behavior is kind of reasonable as we can know which partitions will be 
> overwritten before runtime. However, hive has a different behavior that it 
> only overwrites related partitions, e.g. {{INSERT OVERWRITE tbl SELECT 
> 1,2,3}} will only overwrite partition {{a=2, b=3}}, assuming {{tbl}} has only 
> one data column and is partitioned by {{a}} and {{b}}.
> It seems better if we can follow hive's behavior.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19960) Move `SparkHadoopWriter` to `internal/io/`

2017-03-15 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-19960:


 Summary: Move `SparkHadoopWriter` to `internal/io/`
 Key: SPARK-19960
 URL: https://issues.apache.org/jira/browse/SPARK-19960
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Affects Versions: 2.2.0
Reporter: Jiang Xingbo


We should move `SparkHadoopWriter` to `internal/io/`, that will make it easier 
to consolidate `SparkHadoopWriter` and `SparkHadoopMapReduceWriter`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19877) Restrict the nested level of a view

2017-03-10 Thread Jiang Xingbo (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xingbo updated SPARK-19877:
-
Summary: Restrict the nested level of a view  (was: Restrict the depth of 
view reference chains)

> Restrict the nested level of a view
> ---
>
> Key: SPARK-19877
> URL: https://issues.apache.org/jira/browse/SPARK-19877
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Jiang Xingbo
>
> We should restrict the depth of of view reference chains, to avoid stack 
> overflow exception during resolution of nested views.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19877) Restrict the depth of view reference chains

2017-03-08 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-19877:


 Summary: Restrict the depth of view reference chains
 Key: SPARK-19877
 URL: https://issues.apache.org/jira/browse/SPARK-19877
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.2.0
Reporter: Jiang Xingbo


We should restrict the depth of of view reference chains, to avoid stack 
overflow exception during resolution of nested views.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19877) Restrict the depth of view reference chains

2017-03-08 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902324#comment-15902324
 ] 

Jiang Xingbo commented on SPARK-19877:
--

I'm working on this.

> Restrict the depth of view reference chains
> ---
>
> Key: SPARK-19877
> URL: https://issues.apache.org/jira/browse/SPARK-19877
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Jiang Xingbo
>
> We should restrict the depth of of view reference chains, to avoid stack 
> overflow exception during resolution of nested views.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18389) Disallow cyclic view reference

2017-03-01 Thread Jiang Xingbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15891575#comment-15891575
 ] 

Jiang Xingbo commented on SPARK-18389:
--

I‘ve just figure out a way to work this out, will try to submit a PR later.

> Disallow cyclic view reference
> --
>
> Key: SPARK-18389
> URL: https://issues.apache.org/jira/browse/SPARK-18389
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The following should not be allowed:
> {code}
> CREATE VIEW testView AS SELECT id FROM jt
> CREATE VIEW testView2 AS SELECT id FROM testView
> ALTER VIEW testView AS SELECT * FROM testView2
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >