[jira] [Created] (SPARK-25085) Insert overwrite a non-partitioned table can delete table folder

2018-08-10 Thread Rui Li (JIRA)
Rui Li created SPARK-25085:
--

 Summary: Insert overwrite a non-partitioned table can delete table 
folder
 Key: SPARK-25085
 URL: https://issues.apache.org/jira/browse/SPARK-25085
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
Reporter: Rui Li


When inserting overwrite a data source table, Spark firstly deletes all the 
partitions. For non-partitioned table, it will delete the table folder, which 
is wrong because table folder may contain information like ACL entries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24387) Heartbeat-timeout executor is added back and used again

2018-06-11 Thread Rui Li (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509214#comment-16509214
 ] 

Rui Li commented on SPARK-24387:


Yes, blacklisting can be used to avoid the issue. But blacklist can be turned 
off, or configured to be more tolerant. So it's better to have a more reliable 
solution.

> Heartbeat-timeout executor is added back and used again
> ---
>
> Key: SPARK-24387
> URL: https://issues.apache.org/jira/browse/SPARK-24387
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Rui Li
>Priority: Major
>
> In our job, when there's only one task and one executor running, the 
> executor's heartbeat is lost and driver decides to remove it. However, the 
> executor is added again and the task's retry attempt is scheduled to that 
> executor, almost immediately after the executor is marked as lost.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24387) Heartbeat-timeout executor is added back and used again

2018-05-28 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492563#comment-16492563
 ] 

Rui Li commented on SPARK-24387:


Instead of let HeartbeatReceiver tell TaskScheduler the executor is lost, I'm 
wondering whether it makes sense to let CoarseGrainedSchedulerBackend call 
executorLost in the killExecutors method, at which point, the executor has been 
marked as pending-to-remove and won't be offered again.
[~kayousterhout], [~vanzin] would you mind share your thoughts? Thanks.

> Heartbeat-timeout executor is added back and used again
> ---
>
> Key: SPARK-24387
> URL: https://issues.apache.org/jira/browse/SPARK-24387
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Rui Li
>Priority: Major
>
> In our job, when there's only one task and one executor running, the 
> executor's heartbeat is lost and driver decides to remove it. However, the 
> executor is added again and the task's retry attempt is scheduled to that 
> executor, almost immediately after the executor is marked as lost.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24387) Heartbeat-timeout executor is added back and used again

2018-05-25 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490309#comment-16490309
 ] 

Rui Li commented on SPARK-24387:


When HeartbeatReceiver finds the executor's heartbeat is timeout, it informs 
the TaskScheduler and kills the executor asynchronously. When TaskScheduler 
handles the lost executor, it tries to revive offer from the backend. So I 
think there's a race condition that the backend may make offers before killing 
the executor. And since this is the only executor left, it's offered to the 
TaskScheduler and the retried task is scheduled to it.

And when killing a heartbeat-timeout executor, we expect a replacement executor 
to be launched. But when the new executor is launched, there's no task for it 
to run. So it's kept idle until killed by dynamic allocation.

> Heartbeat-timeout executor is added back and used again
> ---
>
> Key: SPARK-24387
> URL: https://issues.apache.org/jira/browse/SPARK-24387
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Rui Li
>Priority: Major
>
> In our job, when there's only one task and one executor running, the 
> executor's heartbeat is lost and driver decides to remove it. However, the 
> executor is added again and the task's retry attempt is scheduled to that 
> executor, almost immediately after the executor is marked as lost.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24387) Heartbeat-timeout executor is added back and used again

2018-05-25 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490293#comment-16490293
 ] 

Rui Li commented on SPARK-24387:


A snippet of the log w/ some fields masked:
{noformat}
[Stage 2:==>(199 + 1) / 
200]18/05/20 05:37:07 WARN HeartbeatReceiver: Removing executor 1100 with no 
recent heartbeats: 345110 ms exceeds timeout 30 ms
18/05/20 05:37:07 ERROR YarnClusterScheduler: Lost executor 1100 on HOSTA: 
Executor heartbeat timed out after 345110 ms
18/05/20 05:37:07 WARN TaskSetManager: Lost task 55.0 in stage 2.0 (TID 12080, 
HOSTA, executor 1100): ExecutorLostFailure (executor 1100 exited caused by one 
of the running tasks) Reason: Executor heartbeat timed out after 345110 ms
18/05/20 05:37:07 INFO DAGScheduler: Executor lost: 1100 (epoch 2)
18/05/20 05:37:07 INFO DAGScheduler: Host added was in lost list earlier: HOSTA
18/05/20 05:37:07 INFO TaskSetManager: Starting task 55.1 in stage 2.0 (TID 
12225, HOSTA, executor 1100, partition 55, PROCESS_LOCAL, 6227 bytes)
{noformat}

> Heartbeat-timeout executor is added back and used again
> ---
>
> Key: SPARK-24387
> URL: https://issues.apache.org/jira/browse/SPARK-24387
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Rui Li
>Priority: Major
>
> In our job, when there's only one task and one executor running, the 
> executor's heartbeat is lost and driver decides to remove it. However, the 
> executor is added again and the task's retry attempt is scheduled to that 
> executor, almost immediately after the executor is marked as lost.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24387) Heartbeat-timeout executor is added back and used again

2018-05-25 Thread Rui Li (JIRA)
Rui Li created SPARK-24387:
--

 Summary: Heartbeat-timeout executor is added back and used again
 Key: SPARK-24387
 URL: https://issues.apache.org/jira/browse/SPARK-24387
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.1.0
Reporter: Rui Li


In our job, when there's only one task and one executor running, the executor's 
heartbeat is lost and driver decides to remove it. However, the executor is 
added again and the task's retry attempt is scheduled to that executor, almost 
immediately after the executor is marked as lost.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24116) SparkSQL inserting overwrite table has inconsistent behavior regarding HDFS trash

2018-05-03 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462040#comment-16462040
 ] 

Rui Li commented on SPARK-24116:


To reproduce:
{code}
create table test_text(x int);

insert overwrite table test_text values (1),(2);

insert overwrite table test_text values (3),(4); -- the old data is moved to 
trash

create table test_parquet(x int) using parquet;

insert overwrite table test_parquet values (1),(2);

insert overwrite table test_parquet values (3),(4); -- the old data is not 
moved to trash
{code}

> SparkSQL inserting overwrite table has inconsistent behavior regarding HDFS 
> trash
> -
>
> Key: SPARK-24116
> URL: https://issues.apache.org/jira/browse/SPARK-24116
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Rui Li
>Priority: Major
>
> When inserting overwrite a table, the old data may or may not go to trash 
> based on:
>  # Date format. E.g. text table may go to trash but parquet table doesn't.
>  # Whether table is partitioned. E.g. partitioned text table doesn't go to 
> trash while non-partitioned table does.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24116) SparkSQL inserting overwrite table has inconsistent behavior regarding HDFS trash

2018-05-01 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460472#comment-16460472
 ] 

Rui Li commented on SPARK-24116:


[~hyukjin.kwon], sorry for the late response. For example, assume we have two 
non-partitioned tables, one is text table and the other is Parquet table. If we 
insert overwrite the text table, old data will go to HDFS trash. But if we 
insert overwrite the Parquet table, old data doesn't go to trash.
I believe SparkSQL has different code paths to load data into different kinds 
of tables. And whether old data goes to trash is inconsistent among these code 
paths. Specifically, {{Hive::loadTable}} moves old data to trash but seems 
other code paths simply delete the old data. Ideally it's good if SparkSQL lets 
user specify whether old data goes to trash when overwriting, some feature like 
HIVE-15880.

> SparkSQL inserting overwrite table has inconsistent behavior regarding HDFS 
> trash
> -
>
> Key: SPARK-24116
> URL: https://issues.apache.org/jira/browse/SPARK-24116
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Rui Li
>Priority: Major
>
> When inserting overwrite a table, the old data may or may not go to trash 
> based on:
>  # Date format. E.g. text table may go to trash but parquet table doesn't.
>  # Whether table is partitioned. E.g. partitioned text table doesn't go to 
> trash while non-partitioned table does.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24116) SparkSQL inserting overwrite table has inconsistent behavior regarding HDFS trash

2018-04-27 Thread Rui Li (JIRA)
Rui Li created SPARK-24116:
--

 Summary: SparkSQL inserting overwrite table has inconsistent 
behavior regarding HDFS trash
 Key: SPARK-24116
 URL: https://issues.apache.org/jira/browse/SPARK-24116
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.1.0
Reporter: Rui Li


When inserting overwrite a table, the old data may or may not go to trash based 
on:
 # Date format. E.g. text table may go to trash but parquet table doesn't.
 # Whether table is partitioned. E.g. partitioned text table doesn't go to 
trash while non-partitioned table does.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24010) Select from table needs read access on DB folder when storage based auth is enabled

2018-04-21 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16446761#comment-16446761
 ] 

Rui Li commented on SPARK-24010:


[~rxin], thanks for your reply. I noted that InMemoryCatalog even explicitly 
requires DB exists when testing whether table exists. If it is working this way 
by design, I guess we have to investigate whether we can make Hive metastore 
skip authorization when deciding whether a DB exists. And that probably needs a 
new API. Do you have any suggestions about other possible solutions?

> Select from table needs read access on DB folder when storage based auth is 
> enabled
> ---
>
> Key: SPARK-24010
> URL: https://issues.apache.org/jira/browse/SPARK-24010
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Rui Li
>Priority: Major
>
> When HMS enables storage based authorization, SparkSQL requires read access 
> on DB folder in order to select from a table. Such requirement doesn't seem 
> necessary and is not required in Hive.
> The reason is when Analyzer tries to resolve a relation, it calls 
> [SessionCatalog::databaseExists|https://github.com/apache/spark/blob/v2.1.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L469].
>  This will call the metastore get_database API which will perform 
> authorization check.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24010) Select from table needs read access on DB folder when storage based auth is enabled

2018-04-18 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16442146#comment-16442146
 ] 

Rui Li commented on SPARK-24010:


Hi [~rxin], I think checking databaseExists is added in SPARK-14869. Do you 
remember why we need to check whether DB exists? In {{HiveClientImpl:: 
tableExists}}, we're telling Hive not to throw exceptions, so there shouldn't 
be exceptions if DB doesn't exist.

> Select from table needs read access on DB folder when storage based auth is 
> enabled
> ---
>
> Key: SPARK-24010
> URL: https://issues.apache.org/jira/browse/SPARK-24010
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Rui Li
>Priority: Major
>
> When HMS enables storage based authorization, SparkSQL requires read access 
> on DB folder in order to select from a table. Such requirement doesn't seem 
> necessary and is not required in Hive.
> The reason is when Analyzer tries to resolve a relation, it calls 
> [SessionCatalog::databaseExists|https://github.com/apache/spark/blob/v2.1.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L469].
>  This will call the metastore get_database API which will perform 
> authorization check.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24010) Select from table needs read access on DB folder when storage based auth is enabled

2018-04-18 Thread Rui Li (JIRA)
Rui Li created SPARK-24010:
--

 Summary: Select from table needs read access on DB folder when 
storage based auth is enabled
 Key: SPARK-24010
 URL: https://issues.apache.org/jira/browse/SPARK-24010
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.1.0
Reporter: Rui Li


When HMS enables storage based authorization, SparkSQL requires read access on 
DB folder in order to select from a table. Such requirement doesn't seem 
necessary and is not required in Hive.
The reason is when Analyzer tries to resolve a relation, it calls 
[SessionCatalog::databaseExists|https://github.com/apache/spark/blob/v2.1.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L469].
 This will call the metastore get_database API which will perform authorization 
check.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14958) Failed task hangs if error is encountered when getting task result

2016-10-09 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15561094#comment-15561094
 ] 

Rui Li commented on SPARK-14958:


I hit the issue with Spark 1.6.1

> Failed task hangs if error is encountered when getting task result
> --
>
> Key: SPARK-14958
> URL: https://issues.apache.org/jira/browse/SPARK-14958
> Project: Spark
>  Issue Type: Bug
>Reporter: Rui Li
>
> In {{TaskResultGetter}}, if we get an error when deserialize 
> {{TaskEndReason}}, TaskScheduler won't have a chance to handle the failed 
> task and the task just hangs.
> {code}
>   def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: 
> TaskState,
> serializedData: ByteBuffer) {
> var reason : TaskEndReason = UnknownReason
> try {
>   getTaskResultExecutor.execute(new Runnable {
> override def run(): Unit = Utils.logUncaughtExceptions {
>   val loader = Utils.getContextOrSparkClassLoader
>   try {
> if (serializedData != null && serializedData.limit() > 0) {
>   reason = serializer.get().deserialize[TaskEndReason](
> serializedData, loader)
> }
>   } catch {
> case cnd: ClassNotFoundException =>
>   // Log an error but keep going here -- the task failed, so not 
> catastrophic
>   // if we can't deserialize the reason.
>   logError(
> "Could not deserialize TaskEndReason: ClassNotFound with 
> classloader " + loader)
> case ex: Exception => {}
>   }
>   scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
> }
>   })
> } catch {
>   case e: RejectedExecutionException if sparkEnv.isStopped =>
> // ignore it
> }
>   }
> {code}
> In my specific case, I got a NoClassDefFoundError and the failed task hangs 
> forever.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-14958) Failed task hangs if error is encountered when getting task result

2016-04-27 Thread Rui Li (JIRA)
Rui Li created SPARK-14958:
--

 Summary: Failed task hangs if error is encountered when getting 
task result
 Key: SPARK-14958
 URL: https://issues.apache.org/jira/browse/SPARK-14958
 Project: Spark
  Issue Type: Bug
Reporter: Rui Li


In {{TaskResultGetter}}, if we get an error when deserialize {{TaskEndReason}}, 
TaskScheduler won't have a chance to handle the failed task and the task just 
hangs.
{code}
  def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: 
TaskState,
serializedData: ByteBuffer) {
var reason : TaskEndReason = UnknownReason
try {
  getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
  val loader = Utils.getContextOrSparkClassLoader
  try {
if (serializedData != null && serializedData.limit() > 0) {
  reason = serializer.get().deserialize[TaskEndReason](
serializedData, loader)
}
  } catch {
case cnd: ClassNotFoundException =>
  // Log an error but keep going here -- the task failed, so not 
catastrophic
  // if we can't deserialize the reason.
  logError(
"Could not deserialize TaskEndReason: ClassNotFound with 
classloader " + loader)
case ex: Exception => {}
  }
  scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
}
  })
} catch {
  case e: RejectedExecutionException if sparkEnv.isStopped =>
// ignore it
}
  }
{code}
In my specific case, I got a NoClassDefFoundError and the failed task hangs 
forever.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4440) Enhance the job progress API to expose more information

2015-09-17 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14791732#comment-14791732
 ] 

Rui Li commented on SPARK-4440:
---

For Hive on Spark, we want completion time for each stage so we can compute how 
long the stage takes(there's already a submission time in {{SparkStageInfo}}).
It'll be great if we can also get task metrics. Currently we have to implement 
SparkListener to collect metrics.

[~chengxiang li] and [~xuefuz], do you have anything to add?

> Enhance the job progress API to expose more information
> ---
>
> Key: SPARK-4440
> URL: https://issues.apache.org/jira/browse/SPARK-4440
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Rui Li
>
> The progress API introduced in SPARK-2321 provides a new way for user to 
> monitor job progress. However the information exposed in the API is 
> relatively limited. It'll be much more useful if we can enhance the API to 
> expose more data.
> Some improvement for example may include but not limited to:
> 1. Stage submission and completion time.
> 2. Task metrics.
> The requirement is initially identified for the hive on spark 
> project(HIVE-7292), other application should benefit as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7081) Faster sort-based shuffle path using binary processing cache-aware sort

2015-05-28 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14562386#comment-14562386
 ] 

Rui Li commented on SPARK-7081:
---

Hi [~joshrosen], requiring the dependency having no aggregation or key ordering 
seems to prevent lots of shuffle from leveraging this new optimization, e.g. 
reduceByKey, sortByKey. Do you have any plan to relax the limitation?

 Faster sort-based shuffle path using binary processing cache-aware sort
 ---

 Key: SPARK-7081
 URL: https://issues.apache.org/jira/browse/SPARK-7081
 Project: Spark
  Issue Type: New Feature
  Components: Shuffle, Spark Core
Reporter: Reynold Xin
Assignee: Josh Rosen
 Fix For: 1.4.0


 (Description copied from GitHub):
 This patch introduces a new shuffle manager that enhances the existing 
 sort-based shuffle with a new cache-friendly sort algorithm that operates 
 directly on binary data. The goals of this patch are to lower memory usage 
 and Java object overheads during shuffle and to speed up sorting. It also 
 lays groundwork for follow-up patches that will enable end-to-end processing 
 of serialized records.
 The new shuffle manager, `UnsafeShuffleManager`, can be enabled by setting 
 `spark.shuffle.manager=tungsten-sort` in SparkConf.
 The new shuffle manager uses directly-managed memory to implement several 
 performance optimizations for certain types of shuffles. In cases where the 
 new performance optimizations cannot be applied, the new shuffle manager 
 delegates to SortShuffleManager to handle those shuffles.
 UnsafeShuffleManager's optimizations will apply when _all_ of the following 
 conditions hold:
  - The shuffle dependency specifies no aggregation or output ordering.
  - The shuffle serializer supports relocation of serialized values (this is 
 currently supported
by KryoSerializer and Spark SQL's custom serializers).
  - The shuffle produces fewer than 16777216 output partitions.
  - No individual record is larger than 128 MB when serialized.
 In addition, extra spill-merging optimizations are automatically applied when 
 the shuffle compression codec supports concatenation of serialized streams. 
 This is currently supported by Spark's LZF serializer.
 At a high-level, UnsafeShuffleManager's design is similar to Spark's existing 
 SortShuffleManager.  In sort-based shuffle, incoming records are sorted 
 according to their target partition ids, then written to a single map output 
 file. Reducers fetch contiguous regions of this file in order to read their 
 portion of the map output. In cases where the map output data is too large to 
 fit in memory, sorted subsets of the output can are spilled to disk and those 
 on-disk files are merged to produce the final output file.
 UnsafeShuffleManager optimizes this process in several ways:
  - Its sort operates on serialized binary data rather than Java objects, 
 which reduces memory consumption and GC overheads. This optimization requires 
 the record serializer to have certain properties to allow serialized records 
 to be re-ordered without requiring deserialization.  See SPARK-4550, where 
 this optimization was first proposed and implemented, for more details.
  - It uses a specialized cache-efficient sorter (UnsafeShuffleExternalSorter) 
 that sorts arrays of compressed record pointers and partition ids. By using 
 only 8 bytes of space per record in the sorting array, this fits more of the 
 array into cache.
  - The spill merging procedure operates on blocks of serialized records that 
 belong to the same partition and does not need to deserialize records during 
 the merge.
  - When the spill compression codec supports concatenation of compressed 
 data, the spill merge simply concatenates the serialized and compressed spill 
 partitions to produce the final output partition.  This allows efficient data 
 copying methods, like NIO's `transferTo`, to be used and avoids the need to 
 allocate decompression or copying buffers during the merge.
 The shuffle read path is unchanged.
 This patch is similar to http://issues.apache.org/jira/browse/SPARK-4550  but 
 uses a slightly different implementation. The `unsafe`-based implementation 
 featured in this patch lays the groundwork for followup patches that will 
 enable sorting to operate on serialized data pages that will be prepared by 
 Spark SQL's new `unsafe` operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2621) Update task InputMetrics incrementally

2015-01-08 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14270714#comment-14270714
 ] 

Rui Li commented on SPARK-2621:
---

Hey [~sandyr], it seems after this change we require the input split to be a 
{{FileSplit}} to collect input metrics. Do you think it's possible to support 
{{CombineFileSplit}} as well? Thanks!

 Update task InputMetrics incrementally
 --

 Key: SPARK-2621
 URL: https://issues.apache.org/jira/browse/SPARK-2621
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Sandy Ryza
Assignee: Sandy Ryza
 Fix For: 1.2.0






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-5080) Expose more cluster resource information to user

2015-01-04 Thread Rui Li (JIRA)
Rui Li created SPARK-5080:
-

 Summary: Expose more cluster resource information to user
 Key: SPARK-5080
 URL: https://issues.apache.org/jira/browse/SPARK-5080
 Project: Spark
  Issue Type: Improvement
Reporter: Rui Li


It'll be useful if user can get detailed cluster resource info, e.g. 
granted/allocated executors, memory and CPU.
Such information is available via WebUI but seems SparkContext doesn't have 
these APIs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4921) Performance issue caused by TaskSetManager returning PROCESS_LOCAL for NO_PREF tasks

2014-12-22 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14256425#comment-14256425
 ] 

Rui Li commented on SPARK-4921:
---

I'm not sure if this is intended, but returning process_local for no_pref tasks 
may reset {{currentLocalityIndex}} to 0 which may cause more delay later. Seems 
there's a check to avoid this but I doubt it's sufficient:
{code}
  // Update our locality level for delay scheduling
  // NO_PREF will not affect the variables related to delay scheduling
  if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
  }
{code}

 Performance issue caused by TaskSetManager returning  PROCESS_LOCAL for 
 NO_PREF tasks
 -

 Key: SPARK-4921
 URL: https://issues.apache.org/jira/browse/SPARK-4921
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Xuefu Zhang
 Attachments: NO_PREF.patch


 During research for HIVE-9153, we found that TaskSetManager returns 
 PROCESS_LOCAL for NO_PREF tasks, which may caused performance degradation. 
 Changing the return value to NO_PREF, as demonstrated in the attached patch, 
 seemingly improves the performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2321) Design a proper progress reporting event listener API

2014-11-18 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14217485#comment-14217485
 ] 

Rui Li commented on SPARK-2321:
---

Hi [~joshrosen],

Shall we make {{SparkJobInfo}} and {{SparkStageInfo}} serializable? Mainly for 
the case when spark context runs remotely. What's your opinion?

 Design a proper progress reporting  event listener API
 ---

 Key: SPARK-2321
 URL: https://issues.apache.org/jira/browse/SPARK-2321
 Project: Spark
  Issue Type: Improvement
  Components: Java API, Spark Core
Affects Versions: 1.0.0
Reporter: Reynold Xin
Assignee: Josh Rosen
Priority: Critical
 Fix For: 1.2.0


 This is a ticket to track progress on redesigning the SparkListener and 
 JobProgressListener API.
 There are multiple problems with the current design, including:
 0. I'm not sure if the API is usable in Java (there are at least some enums 
 we used in Scala and a bunch of case classes that might complicate things).
 1. The whole API is marked as DeveloperApi, because we haven't paid a lot of 
 attention to it yet. Something as important as progress reporting deserves a 
 more stable API.
 2. There is no easy way to connect jobs with stages. Similarly, there is no 
 easy way to connect job groups with jobs / stages.
 3. JobProgressListener itself has no encapsulation at all. States can be 
 arbitrarily mutated by external programs. Variable names are sort of randomly 
 decided and inconsistent. 
 We should just revisit these and propose a new, concrete design. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4440) Enhance the job progress API to expose more information

2014-11-16 Thread Rui Li (JIRA)
Rui Li created SPARK-4440:
-

 Summary: Enhance the job progress API to expose more information
 Key: SPARK-4440
 URL: https://issues.apache.org/jira/browse/SPARK-4440
 Project: Spark
  Issue Type: Improvement
Reporter: Rui Li


The progress API introduced in SPARK-2321 provides a new way for user to 
monitor job progress. However the information exposed in the API is relatively 
limited. It'll be much more useful if we can enhance the API to expose more 
data.
Some improvement for example may include but not limited to:
1. Stage submission and completion time.
2. Task metrics.
The requirement is initially identified for the hive on spark 
project(HIVE-7292), other application should benefit as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2321) Design a proper progress reporting event listener API

2014-11-16 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14214223#comment-14214223
 ] 

Rui Li commented on SPARK-2321:
---

Hey [~joshrosen],

Thanks a lot for the update! I created SPARK-4440 for the enhancement.

 Design a proper progress reporting  event listener API
 ---

 Key: SPARK-2321
 URL: https://issues.apache.org/jira/browse/SPARK-2321
 Project: Spark
  Issue Type: Improvement
  Components: Java API, Spark Core
Affects Versions: 1.0.0
Reporter: Reynold Xin
Assignee: Josh Rosen
Priority: Critical
 Fix For: 1.2.0


 This is a ticket to track progress on redesigning the SparkListener and 
 JobProgressListener API.
 There are multiple problems with the current design, including:
 0. I'm not sure if the API is usable in Java (there are at least some enums 
 we used in Scala and a bunch of case classes that might complicate things).
 1. The whole API is marked as DeveloperApi, because we haven't paid a lot of 
 attention to it yet. Something as important as progress reporting deserves a 
 more stable API.
 2. There is no easy way to connect jobs with stages. Similarly, there is no 
 easy way to connect job groups with jobs / stages.
 3. JobProgressListener itself has no encapsulation at all. States can be 
 arbitrarily mutated by external programs. Variable names are sort of randomly 
 decided and inconsistent. 
 We should just revisit these and propose a new, concrete design. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2321) Design a proper progress reporting event listener API

2014-11-14 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212000#comment-14212000
 ] 

Rui Li commented on SPARK-2321:
---

Hi [~joshrosen],

The new API is quite useful. But the information exposed is relatively limited 
at the moment. Do you have any plan to enhance it? For example, submission and 
completion time is not available in {{SparkStageInfo}}, while they're provided 
in {{StageInfo}}.
Thanks!

 Design a proper progress reporting  event listener API
 ---

 Key: SPARK-2321
 URL: https://issues.apache.org/jira/browse/SPARK-2321
 Project: Spark
  Issue Type: Improvement
  Components: Java API, Spark Core
Affects Versions: 1.0.0
Reporter: Reynold Xin
Assignee: Josh Rosen
Priority: Critical
 Fix For: 1.2.0


 This is a ticket to track progress on redesigning the SparkListener and 
 JobProgressListener API.
 There are multiple problems with the current design, including:
 0. I'm not sure if the API is usable in Java (there are at least some enums 
 we used in Scala and a bunch of case classes that might complicate things).
 1. The whole API is marked as DeveloperApi, because we haven't paid a lot of 
 attention to it yet. Something as important as progress reporting deserves a 
 more stable API.
 2. There is no easy way to connect jobs with stages. Similarly, there is no 
 easy way to connect job groups with jobs / stages.
 3. JobProgressListener itself has no encapsulation at all. States can be 
 arbitrarily mutated by external programs. Variable names are sort of randomly 
 decided and inconsistent. 
 We should just revisit these and propose a new, concrete design. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2636) no where to get job identifier while submit spark job through spark API

2014-08-25 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14110172#comment-14110172
 ] 

Rui Li commented on SPARK-2636:
---

Just want to make sure I understand everything correctly:

I think user submits a job via an RDD action, which in turn calls 
{{SparkContex.runJob - DAGScheduler.runJob - DAGScheduler.submitJob - 
DAGScheduler.handleJobSubmitted}}. The requirement is we should return some job 
ID to the user. So I think putting that in a DAGScheduler method doesn't help? 
BTW, {{DAGScheduler.submitJob}} returns a {{JobWaiter}} which contains the job 
ID.

Also, by job ID, do we mean {{org.apache.spark.streaming.scheduler.Job.id}} 
or {{org.apache.spark.scheduler.ActiveJob.jobId}}?

Please let me know if I misunderstand anything.

 no where to get job identifier while submit spark job through spark API
 ---

 Key: SPARK-2636
 URL: https://issues.apache.org/jira/browse/SPARK-2636
 Project: Spark
  Issue Type: New Feature
  Components: Java API
Reporter: Chengxiang Li
  Labels: hive

 In Hive on Spark, we want to track spark job status through Spark API, the 
 basic idea is as following:
 # create an hive-specified spark listener and register it to spark listener 
 bus.
 # hive-specified spark listener generate job status by spark listener events.
 # hive driver track job status through hive-specified spark listener. 
 the current problem is that hive driver need job identifier to track 
 specified job status through spark listener, but there is no spark API to get 
 job identifier(like job id) while submit spark job.
 I think other project whoever try to track job status with spark API would 
 suffer from this as well.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-2740) In JavaPairRdd, allow user to specify ascending and numPartitions for sortByKey

2014-07-29 Thread Rui Li (JIRA)
Rui Li created SPARK-2740:
-

 Summary: In JavaPairRdd, allow user to specify ascending and 
numPartitions for sortByKey
 Key: SPARK-2740
 URL: https://issues.apache.org/jira/browse/SPARK-2740
 Project: Spark
  Issue Type: Improvement
Reporter: Rui Li
Priority: Minor


It should be more convenient if user can specify ascending and numPartitions 
when calling sortByKey.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2387) Remove the stage barrier for better resource utilization

2014-07-29 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14078798#comment-14078798
 ] 

Rui Li commented on SPARK-2387:
---

Right, thanks [~joshrosen] for pointing out. This is just some initial 
thoughts, anyway :)

 Remove the stage barrier for better resource utilization
 

 Key: SPARK-2387
 URL: https://issues.apache.org/jira/browse/SPARK-2387
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Reporter: Rui Li

 DAGScheduler divides a Spark job into multiple stages according to RDD 
 dependencies. Whenever there’s a shuffle dependency, DAGScheduler creates a 
 shuffle map stage on the map side, and another stage depending on that stage.
 Currently, the downstream stage cannot start until all its depended stages 
 have finished. This barrier between stages leads to idle slots when waiting 
 for the last few upstream tasks to finish and thus wasting cluster resources.
 Therefore we propose to remove the barrier and pre-start the reduce stage 
 once there're free slots. This can achieve better resource utilization and 
 improve the overall job performance, especially when there're lots of 
 executors granted to the application.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2387) Remove the stage barrier for better resource utilization

2014-07-08 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14054862#comment-14054862
 ] 

Rui Li commented on SPARK-2387:
---

PR created at
https://github.com/apache/spark/pull/1328

 Remove the stage barrier for better resource utilization
 

 Key: SPARK-2387
 URL: https://issues.apache.org/jira/browse/SPARK-2387
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Reporter: Rui Li

 DAGScheduler divides a Spark job into multiple stages according to RDD 
 dependencies. Whenever there’s a shuffle dependency, DAGScheduler creates a 
 shuffle map stage on the map side, and another stage depending on that stage.
 Currently, the downstream stage cannot start until all its depended stages 
 have finished. This barrier between stages leads to idle slots when waiting 
 for the last few upstream tasks to finish and thus wasting cluster resources.
 Therefore we propose to remove the barrier and pre-start the reduce stage 
 once there're free slots. This can achieve better resource utilization and 
 improve the overall job performance, especially when there're lots of 
 executors granted to the application.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-2387) Remove the stage barrier for better resource utilization

2014-07-07 Thread Rui Li (JIRA)
Rui Li created SPARK-2387:
-

 Summary: Remove the stage barrier for better resource utilization
 Key: SPARK-2387
 URL: https://issues.apache.org/jira/browse/SPARK-2387
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Reporter: Rui Li


DAGScheduler divides a Spark job into multiple stages according to RDD 
dependencies. Whenever there’s a shuffle dependency, DAGScheduler creates a 
shuffle map stage on the map side, and another stage depending on that stage.
Currently, the downstream stage cannot start until all its depended stages have 
finished. This barrier between stages leads to idle slots when waiting for the 
last few upstream tasks to finish and thus wasting cluster resources.
Therefore we propose to remove the barrier and pre-start the reduce stage once 
there're free slots. This can achieve better resource utilization and improve 
the overall job performance, especially when there're lots of executors granted 
to the application.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack

2014-07-03 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14052103#comment-14052103
 ] 

Rui Li commented on SPARK-2277:
---

With [PR #892|https://github.com/apache/spark/pull/892], we'll check if a 
task's preference is available when adding it to pending lists. TaskScheduler 
tracks information about executor/host, so that TaskSetManager can check if the 
preferred executor/host is available.

TaskScheduler also provides getRackForHost to get the corresponding rack for a 
host (currently only returns None). I think this is some prior acquired 
knowledge about the cluster topology, which does not indicate whether there's 
any host on that rack granted to this spark app. Therefore we don't know the 
availability of the preferred rack.

 Make TaskScheduler track whether there's host on a rack
 ---

 Key: SPARK-2277
 URL: https://issues.apache.org/jira/browse/SPARK-2277
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Rui Li

 When TaskSetManager adds a pending task, it checks whether the tasks's 
 preferred location is available. Regarding RACK_LOCAL task, we consider the 
 preferred rack available if such a rack is defined for the preferred host. 
 This is incorrect as there may be no alive hosts on that rack at all. 
 Therefore, TaskScheduler should track the hosts on each rack, and provides an 
 API for TaskSetManager to check if there's host alive on a specific rack.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack

2014-07-02 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14050951#comment-14050951
 ] 

Rui Li commented on SPARK-2277:
---

Suppose task1 prefers node1 but node1 is not available at the moment. However, 
we know node1 is on rack1, which makes task1 prefers rack1 for RACK_LOCAL 
locality. The problem is, we don't know if there's alive host on rack1, so we 
cannot check the availability of this preference.
Please let me know if I misunderstand anything :)

 Make TaskScheduler track whether there's host on a rack
 ---

 Key: SPARK-2277
 URL: https://issues.apache.org/jira/browse/SPARK-2277
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Rui Li

 When TaskSetManager adds a pending task, it checks whether the tasks's 
 preferred location is available. Regarding RACK_LOCAL task, we consider the 
 preferred rack available if such a rack is defined for the preferred host. 
 This is incorrect as there may be no alive hosts on that rack at all. 
 Therefore, TaskScheduler should track the hosts on each rack, and provides an 
 API for TaskSetManager to check if there's host alive on a specific rack.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack

2014-07-02 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14050952#comment-14050952
 ] 

Rui Li commented on SPARK-2277:
---

PR created at:
https://github.com/apache/spark/pull/1212

 Make TaskScheduler track whether there's host on a rack
 ---

 Key: SPARK-2277
 URL: https://issues.apache.org/jira/browse/SPARK-2277
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Rui Li

 When TaskSetManager adds a pending task, it checks whether the tasks's 
 preferred location is available. Regarding RACK_LOCAL task, we consider the 
 preferred rack available if such a rack is defined for the preferred host. 
 This is incorrect as there may be no alive hosts on that rack at all. 
 Therefore, TaskScheduler should track the hosts on each rack, and provides an 
 API for TaskSetManager to check if there's host alive on a specific rack.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-2277) Make TaskScheduler track whether there's host on a rack

2014-06-25 Thread Rui Li (JIRA)
Rui Li created SPARK-2277:
-

 Summary: Make TaskScheduler track whether there's host on a rack
 Key: SPARK-2277
 URL: https://issues.apache.org/jira/browse/SPARK-2277
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Rui Li


When TaskSetManager adds a pending task, it checks whether the tasks's 
preferred location is available. Regarding RACK_LOCAL task, we consider the 
preferred rack available if such a rack is defined for the preferred host. This 
is incorrect as there may be no alive hosts on that rack at all. Therefore, 
TaskScheduler should track the hosts on each rack, and provides an API for 
TaskSetManager to check if there's host alive on a specific rack.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-1937) Tasks can be submitted before executors are registered

2014-05-27 Thread Rui Li (JIRA)
Rui Li created SPARK-1937:
-

 Summary: Tasks can be submitted before executors are registered
 Key: SPARK-1937
 URL: https://issues.apache.org/jira/browse/SPARK-1937
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Rui Li


During construction, TaskSetManager will assign tasks to several pending lists 
according to the tasks’ preferred locations. If the desired location is 
unavailable, it’ll then assign this task to “pendingTasksWithNoPrefs”, a list 
containing tasks without preferred locations.
The problem is that tasks may be submitted before the executors get registered 
with the driver, in which case TaskSetManager will assign all the tasks to 
pendingTasksWithNoPrefs. Later when it looks for a task to schedule, it will 
pick one from this list and assign it to arbitrary executor, since 
TaskSetManager considers the tasks can run equally well on any node.
This problem deprives benefits of data locality, drags the whole job slow and 
can cause imbalance between executors.
I ran into this issue when running a spark program on a 7-node cluster 
(node6~node12). The program processes 100GB data.
Since the data is uploaded to HDFS from node6, this node has a complete copy of 
the data and as a result, node6 finishes tasks much faster, which in turn makes 
it complete dis-proportionally more tasks than other nodes.
To solve this issue, I think we shouldn't check availability of executors/hosts 
when constructing TaskSetManager. If a task prefers a node, we simply add the 
task to that node’s pending list. When later on the node is added, 
TaskSetManager can schedule the task according to proper locality level. If 
unfortunately the preferred node(s) never gets added, TaskSetManager can still 
schedule the task at locality level “ANY”.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (SPARK-1937) Tasks can be submitted before executors are registered

2014-05-27 Thread Rui Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Li updated SPARK-1937:
--

Attachment: Before-patch.png

 Tasks can be submitted before executors are registered
 --

 Key: SPARK-1937
 URL: https://issues.apache.org/jira/browse/SPARK-1937
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Rui Li
 Attachments: After-patch.PNG, Before-patch.png


 During construction, TaskSetManager will assign tasks to several pending 
 lists according to the tasks’ preferred locations. If the desired location is 
 unavailable, it’ll then assign this task to “pendingTasksWithNoPrefs”, a list 
 containing tasks without preferred locations.
 The problem is that tasks may be submitted before the executors get 
 registered with the driver, in which case TaskSetManager will assign all the 
 tasks to pendingTasksWithNoPrefs. Later when it looks for a task to schedule, 
 it will pick one from this list and assign it to arbitrary executor, since 
 TaskSetManager considers the tasks can run equally well on any node.
 This problem deprives benefits of data locality, drags the whole job slow and 
 can cause imbalance between executors.
 I ran into this issue when running a spark program on a 7-node cluster 
 (node6~node12). The program processes 100GB data.
 Since the data is uploaded to HDFS from node6, this node has a complete copy 
 of the data and as a result, node6 finishes tasks much faster, which in turn 
 makes it complete dis-proportionally more tasks than other nodes.
 To solve this issue, I think we shouldn't check availability of 
 executors/hosts when constructing TaskSetManager. If a task prefers a node, 
 we simply add the task to that node’s pending list. When later on the node is 
 added, TaskSetManager can schedule the task according to proper locality 
 level. If unfortunately the preferred node(s) never gets added, 
 TaskSetManager can still schedule the task at locality level “ANY”.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (SPARK-1937) Tasks can be submitted before executors are registered

2014-05-27 Thread Rui Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Li updated SPARK-1937:
--

Attachment: RSBTest.scala

The program that triggers the problem.
With the patch, the whole execution time of the job reduces from nearly 600s to 
around 250s.

 Tasks can be submitted before executors are registered
 --

 Key: SPARK-1937
 URL: https://issues.apache.org/jira/browse/SPARK-1937
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Rui Li
 Attachments: After-patch.PNG, Before-patch.png, RSBTest.scala


 During construction, TaskSetManager will assign tasks to several pending 
 lists according to the tasks’ preferred locations. If the desired location is 
 unavailable, it’ll then assign this task to “pendingTasksWithNoPrefs”, a list 
 containing tasks without preferred locations.
 The problem is that tasks may be submitted before the executors get 
 registered with the driver, in which case TaskSetManager will assign all the 
 tasks to pendingTasksWithNoPrefs. Later when it looks for a task to schedule, 
 it will pick one from this list and assign it to arbitrary executor, since 
 TaskSetManager considers the tasks can run equally well on any node.
 This problem deprives benefits of data locality, drags the whole job slow and 
 can cause imbalance between executors.
 I ran into this issue when running a spark program on a 7-node cluster 
 (node6~node12). The program processes 100GB data.
 Since the data is uploaded to HDFS from node6, this node has a complete copy 
 of the data and as a result, node6 finishes tasks much faster, which in turn 
 makes it complete dis-proportionally more tasks than other nodes.
 To solve this issue, I think we shouldn't check availability of 
 executors/hosts when constructing TaskSetManager. If a task prefers a node, 
 we simply add the task to that node’s pending list. When later on the node is 
 added, TaskSetManager can schedule the task according to proper locality 
 level. If unfortunately the preferred node(s) never gets added, 
 TaskSetManager can still schedule the task at locality level “ANY”.



--
This message was sent by Atlassian JIRA
(v6.2#6252)