[jira] [Created] (SPARK-25085) Insert overwrite a non-partitioned table can delete table folder
Rui Li created SPARK-25085: -- Summary: Insert overwrite a non-partitioned table can delete table folder Key: SPARK-25085 URL: https://issues.apache.org/jira/browse/SPARK-25085 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0 Reporter: Rui Li When inserting overwrite a data source table, Spark firstly deletes all the partitions. For non-partitioned table, it will delete the table folder, which is wrong because table folder may contain information like ACL entries. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24387) Heartbeat-timeout executor is added back and used again
[ https://issues.apache.org/jira/browse/SPARK-24387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509214#comment-16509214 ] Rui Li commented on SPARK-24387: Yes, blacklisting can be used to avoid the issue. But blacklist can be turned off, or configured to be more tolerant. So it's better to have a more reliable solution. > Heartbeat-timeout executor is added back and used again > --- > > Key: SPARK-24387 > URL: https://issues.apache.org/jira/browse/SPARK-24387 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Rui Li >Priority: Major > > In our job, when there's only one task and one executor running, the > executor's heartbeat is lost and driver decides to remove it. However, the > executor is added again and the task's retry attempt is scheduled to that > executor, almost immediately after the executor is marked as lost. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24387) Heartbeat-timeout executor is added back and used again
[ https://issues.apache.org/jira/browse/SPARK-24387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492563#comment-16492563 ] Rui Li commented on SPARK-24387: Instead of let HeartbeatReceiver tell TaskScheduler the executor is lost, I'm wondering whether it makes sense to let CoarseGrainedSchedulerBackend call executorLost in the killExecutors method, at which point, the executor has been marked as pending-to-remove and won't be offered again. [~kayousterhout], [~vanzin] would you mind share your thoughts? Thanks. > Heartbeat-timeout executor is added back and used again > --- > > Key: SPARK-24387 > URL: https://issues.apache.org/jira/browse/SPARK-24387 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Rui Li >Priority: Major > > In our job, when there's only one task and one executor running, the > executor's heartbeat is lost and driver decides to remove it. However, the > executor is added again and the task's retry attempt is scheduled to that > executor, almost immediately after the executor is marked as lost. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24387) Heartbeat-timeout executor is added back and used again
[ https://issues.apache.org/jira/browse/SPARK-24387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490309#comment-16490309 ] Rui Li commented on SPARK-24387: When HeartbeatReceiver finds the executor's heartbeat is timeout, it informs the TaskScheduler and kills the executor asynchronously. When TaskScheduler handles the lost executor, it tries to revive offer from the backend. So I think there's a race condition that the backend may make offers before killing the executor. And since this is the only executor left, it's offered to the TaskScheduler and the retried task is scheduled to it. And when killing a heartbeat-timeout executor, we expect a replacement executor to be launched. But when the new executor is launched, there's no task for it to run. So it's kept idle until killed by dynamic allocation. > Heartbeat-timeout executor is added back and used again > --- > > Key: SPARK-24387 > URL: https://issues.apache.org/jira/browse/SPARK-24387 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Rui Li >Priority: Major > > In our job, when there's only one task and one executor running, the > executor's heartbeat is lost and driver decides to remove it. However, the > executor is added again and the task's retry attempt is scheduled to that > executor, almost immediately after the executor is marked as lost. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24387) Heartbeat-timeout executor is added back and used again
[ https://issues.apache.org/jira/browse/SPARK-24387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490293#comment-16490293 ] Rui Li commented on SPARK-24387: A snippet of the log w/ some fields masked: {noformat} [Stage 2:==>(199 + 1) / 200]18/05/20 05:37:07 WARN HeartbeatReceiver: Removing executor 1100 with no recent heartbeats: 345110 ms exceeds timeout 30 ms 18/05/20 05:37:07 ERROR YarnClusterScheduler: Lost executor 1100 on HOSTA: Executor heartbeat timed out after 345110 ms 18/05/20 05:37:07 WARN TaskSetManager: Lost task 55.0 in stage 2.0 (TID 12080, HOSTA, executor 1100): ExecutorLostFailure (executor 1100 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 345110 ms 18/05/20 05:37:07 INFO DAGScheduler: Executor lost: 1100 (epoch 2) 18/05/20 05:37:07 INFO DAGScheduler: Host added was in lost list earlier: HOSTA 18/05/20 05:37:07 INFO TaskSetManager: Starting task 55.1 in stage 2.0 (TID 12225, HOSTA, executor 1100, partition 55, PROCESS_LOCAL, 6227 bytes) {noformat} > Heartbeat-timeout executor is added back and used again > --- > > Key: SPARK-24387 > URL: https://issues.apache.org/jira/browse/SPARK-24387 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Rui Li >Priority: Major > > In our job, when there's only one task and one executor running, the > executor's heartbeat is lost and driver decides to remove it. However, the > executor is added again and the task's retry attempt is scheduled to that > executor, almost immediately after the executor is marked as lost. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24387) Heartbeat-timeout executor is added back and used again
Rui Li created SPARK-24387: -- Summary: Heartbeat-timeout executor is added back and used again Key: SPARK-24387 URL: https://issues.apache.org/jira/browse/SPARK-24387 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.1.0 Reporter: Rui Li In our job, when there's only one task and one executor running, the executor's heartbeat is lost and driver decides to remove it. However, the executor is added again and the task's retry attempt is scheduled to that executor, almost immediately after the executor is marked as lost. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24116) SparkSQL inserting overwrite table has inconsistent behavior regarding HDFS trash
[ https://issues.apache.org/jira/browse/SPARK-24116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462040#comment-16462040 ] Rui Li commented on SPARK-24116: To reproduce: {code} create table test_text(x int); insert overwrite table test_text values (1),(2); insert overwrite table test_text values (3),(4); -- the old data is moved to trash create table test_parquet(x int) using parquet; insert overwrite table test_parquet values (1),(2); insert overwrite table test_parquet values (3),(4); -- the old data is not moved to trash {code} > SparkSQL inserting overwrite table has inconsistent behavior regarding HDFS > trash > - > > Key: SPARK-24116 > URL: https://issues.apache.org/jira/browse/SPARK-24116 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Rui Li >Priority: Major > > When inserting overwrite a table, the old data may or may not go to trash > based on: > # Date format. E.g. text table may go to trash but parquet table doesn't. > # Whether table is partitioned. E.g. partitioned text table doesn't go to > trash while non-partitioned table does. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24116) SparkSQL inserting overwrite table has inconsistent behavior regarding HDFS trash
[ https://issues.apache.org/jira/browse/SPARK-24116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460472#comment-16460472 ] Rui Li commented on SPARK-24116: [~hyukjin.kwon], sorry for the late response. For example, assume we have two non-partitioned tables, one is text table and the other is Parquet table. If we insert overwrite the text table, old data will go to HDFS trash. But if we insert overwrite the Parquet table, old data doesn't go to trash. I believe SparkSQL has different code paths to load data into different kinds of tables. And whether old data goes to trash is inconsistent among these code paths. Specifically, {{Hive::loadTable}} moves old data to trash but seems other code paths simply delete the old data. Ideally it's good if SparkSQL lets user specify whether old data goes to trash when overwriting, some feature like HIVE-15880. > SparkSQL inserting overwrite table has inconsistent behavior regarding HDFS > trash > - > > Key: SPARK-24116 > URL: https://issues.apache.org/jira/browse/SPARK-24116 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Rui Li >Priority: Major > > When inserting overwrite a table, the old data may or may not go to trash > based on: > # Date format. E.g. text table may go to trash but parquet table doesn't. > # Whether table is partitioned. E.g. partitioned text table doesn't go to > trash while non-partitioned table does. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24116) SparkSQL inserting overwrite table has inconsistent behavior regarding HDFS trash
Rui Li created SPARK-24116: -- Summary: SparkSQL inserting overwrite table has inconsistent behavior regarding HDFS trash Key: SPARK-24116 URL: https://issues.apache.org/jira/browse/SPARK-24116 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.0 Reporter: Rui Li When inserting overwrite a table, the old data may or may not go to trash based on: # Date format. E.g. text table may go to trash but parquet table doesn't. # Whether table is partitioned. E.g. partitioned text table doesn't go to trash while non-partitioned table does. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24010) Select from table needs read access on DB folder when storage based auth is enabled
[ https://issues.apache.org/jira/browse/SPARK-24010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16446761#comment-16446761 ] Rui Li commented on SPARK-24010: [~rxin], thanks for your reply. I noted that InMemoryCatalog even explicitly requires DB exists when testing whether table exists. If it is working this way by design, I guess we have to investigate whether we can make Hive metastore skip authorization when deciding whether a DB exists. And that probably needs a new API. Do you have any suggestions about other possible solutions? > Select from table needs read access on DB folder when storage based auth is > enabled > --- > > Key: SPARK-24010 > URL: https://issues.apache.org/jira/browse/SPARK-24010 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Rui Li >Priority: Major > > When HMS enables storage based authorization, SparkSQL requires read access > on DB folder in order to select from a table. Such requirement doesn't seem > necessary and is not required in Hive. > The reason is when Analyzer tries to resolve a relation, it calls > [SessionCatalog::databaseExists|https://github.com/apache/spark/blob/v2.1.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L469]. > This will call the metastore get_database API which will perform > authorization check. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24010) Select from table needs read access on DB folder when storage based auth is enabled
[ https://issues.apache.org/jira/browse/SPARK-24010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16442146#comment-16442146 ] Rui Li commented on SPARK-24010: Hi [~rxin], I think checking databaseExists is added in SPARK-14869. Do you remember why we need to check whether DB exists? In {{HiveClientImpl:: tableExists}}, we're telling Hive not to throw exceptions, so there shouldn't be exceptions if DB doesn't exist. > Select from table needs read access on DB folder when storage based auth is > enabled > --- > > Key: SPARK-24010 > URL: https://issues.apache.org/jira/browse/SPARK-24010 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Rui Li >Priority: Major > > When HMS enables storage based authorization, SparkSQL requires read access > on DB folder in order to select from a table. Such requirement doesn't seem > necessary and is not required in Hive. > The reason is when Analyzer tries to resolve a relation, it calls > [SessionCatalog::databaseExists|https://github.com/apache/spark/blob/v2.1.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L469]. > This will call the metastore get_database API which will perform > authorization check. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24010) Select from table needs read access on DB folder when storage based auth is enabled
Rui Li created SPARK-24010: -- Summary: Select from table needs read access on DB folder when storage based auth is enabled Key: SPARK-24010 URL: https://issues.apache.org/jira/browse/SPARK-24010 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.0 Reporter: Rui Li When HMS enables storage based authorization, SparkSQL requires read access on DB folder in order to select from a table. Such requirement doesn't seem necessary and is not required in Hive. The reason is when Analyzer tries to resolve a relation, it calls [SessionCatalog::databaseExists|https://github.com/apache/spark/blob/v2.1.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L469]. This will call the metastore get_database API which will perform authorization check. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14958) Failed task hangs if error is encountered when getting task result
[ https://issues.apache.org/jira/browse/SPARK-14958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15561094#comment-15561094 ] Rui Li commented on SPARK-14958: I hit the issue with Spark 1.6.1 > Failed task hangs if error is encountered when getting task result > -- > > Key: SPARK-14958 > URL: https://issues.apache.org/jira/browse/SPARK-14958 > Project: Spark > Issue Type: Bug >Reporter: Rui Li > > In {{TaskResultGetter}}, if we get an error when deserialize > {{TaskEndReason}}, TaskScheduler won't have a chance to handle the failed > task and the task just hangs. > {code} > def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: > TaskState, > serializedData: ByteBuffer) { > var reason : TaskEndReason = UnknownReason > try { > getTaskResultExecutor.execute(new Runnable { > override def run(): Unit = Utils.logUncaughtExceptions { > val loader = Utils.getContextOrSparkClassLoader > try { > if (serializedData != null && serializedData.limit() > 0) { > reason = serializer.get().deserialize[TaskEndReason]( > serializedData, loader) > } > } catch { > case cnd: ClassNotFoundException => > // Log an error but keep going here -- the task failed, so not > catastrophic > // if we can't deserialize the reason. > logError( > "Could not deserialize TaskEndReason: ClassNotFound with > classloader " + loader) > case ex: Exception => {} > } > scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) > } > }) > } catch { > case e: RejectedExecutionException if sparkEnv.isStopped => > // ignore it > } > } > {code} > In my specific case, I got a NoClassDefFoundError and the failed task hangs > forever. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-14958) Failed task hangs if error is encountered when getting task result
Rui Li created SPARK-14958: -- Summary: Failed task hangs if error is encountered when getting task result Key: SPARK-14958 URL: https://issues.apache.org/jira/browse/SPARK-14958 Project: Spark Issue Type: Bug Reporter: Rui Li In {{TaskResultGetter}}, if we get an error when deserialize {{TaskEndReason}}, TaskScheduler won't have a chance to handle the failed task and the task just hangs. {code} def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, serializedData: ByteBuffer) { var reason : TaskEndReason = UnknownReason try { getTaskResultExecutor.execute(new Runnable { override def run(): Unit = Utils.logUncaughtExceptions { val loader = Utils.getContextOrSparkClassLoader try { if (serializedData != null && serializedData.limit() > 0) { reason = serializer.get().deserialize[TaskEndReason]( serializedData, loader) } } catch { case cnd: ClassNotFoundException => // Log an error but keep going here -- the task failed, so not catastrophic // if we can't deserialize the reason. logError( "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) case ex: Exception => {} } scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) } }) } catch { case e: RejectedExecutionException if sparkEnv.isStopped => // ignore it } } {code} In my specific case, I got a NoClassDefFoundError and the failed task hangs forever. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4440) Enhance the job progress API to expose more information
[ https://issues.apache.org/jira/browse/SPARK-4440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14791732#comment-14791732 ] Rui Li commented on SPARK-4440: --- For Hive on Spark, we want completion time for each stage so we can compute how long the stage takes(there's already a submission time in {{SparkStageInfo}}). It'll be great if we can also get task metrics. Currently we have to implement SparkListener to collect metrics. [~chengxiang li] and [~xuefuz], do you have anything to add? > Enhance the job progress API to expose more information > --- > > Key: SPARK-4440 > URL: https://issues.apache.org/jira/browse/SPARK-4440 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Rui Li > > The progress API introduced in SPARK-2321 provides a new way for user to > monitor job progress. However the information exposed in the API is > relatively limited. It'll be much more useful if we can enhance the API to > expose more data. > Some improvement for example may include but not limited to: > 1. Stage submission and completion time. > 2. Task metrics. > The requirement is initially identified for the hive on spark > project(HIVE-7292), other application should benefit as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7081) Faster sort-based shuffle path using binary processing cache-aware sort
[ https://issues.apache.org/jira/browse/SPARK-7081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14562386#comment-14562386 ] Rui Li commented on SPARK-7081: --- Hi [~joshrosen], requiring the dependency having no aggregation or key ordering seems to prevent lots of shuffle from leveraging this new optimization, e.g. reduceByKey, sortByKey. Do you have any plan to relax the limitation? Faster sort-based shuffle path using binary processing cache-aware sort --- Key: SPARK-7081 URL: https://issues.apache.org/jira/browse/SPARK-7081 Project: Spark Issue Type: New Feature Components: Shuffle, Spark Core Reporter: Reynold Xin Assignee: Josh Rosen Fix For: 1.4.0 (Description copied from GitHub): This patch introduces a new shuffle manager that enhances the existing sort-based shuffle with a new cache-friendly sort algorithm that operates directly on binary data. The goals of this patch are to lower memory usage and Java object overheads during shuffle and to speed up sorting. It also lays groundwork for follow-up patches that will enable end-to-end processing of serialized records. The new shuffle manager, `UnsafeShuffleManager`, can be enabled by setting `spark.shuffle.manager=tungsten-sort` in SparkConf. The new shuffle manager uses directly-managed memory to implement several performance optimizations for certain types of shuffles. In cases where the new performance optimizations cannot be applied, the new shuffle manager delegates to SortShuffleManager to handle those shuffles. UnsafeShuffleManager's optimizations will apply when _all_ of the following conditions hold: - The shuffle dependency specifies no aggregation or output ordering. - The shuffle serializer supports relocation of serialized values (this is currently supported by KryoSerializer and Spark SQL's custom serializers). - The shuffle produces fewer than 16777216 output partitions. - No individual record is larger than 128 MB when serialized. In addition, extra spill-merging optimizations are automatically applied when the shuffle compression codec supports concatenation of serialized streams. This is currently supported by Spark's LZF serializer. At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager. In sort-based shuffle, incoming records are sorted according to their target partition ids, then written to a single map output file. Reducers fetch contiguous regions of this file in order to read their portion of the map output. In cases where the map output data is too large to fit in memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged to produce the final output file. UnsafeShuffleManager optimizes this process in several ways: - Its sort operates on serialized binary data rather than Java objects, which reduces memory consumption and GC overheads. This optimization requires the record serializer to have certain properties to allow serialized records to be re-ordered without requiring deserialization. See SPARK-4550, where this optimization was first proposed and implemented, for more details. - It uses a specialized cache-efficient sorter (UnsafeShuffleExternalSorter) that sorts arrays of compressed record pointers and partition ids. By using only 8 bytes of space per record in the sorting array, this fits more of the array into cache. - The spill merging procedure operates on blocks of serialized records that belong to the same partition and does not need to deserialize records during the merge. - When the spill compression codec supports concatenation of compressed data, the spill merge simply concatenates the serialized and compressed spill partitions to produce the final output partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used and avoids the need to allocate decompression or copying buffers during the merge. The shuffle read path is unchanged. This patch is similar to http://issues.apache.org/jira/browse/SPARK-4550 but uses a slightly different implementation. The `unsafe`-based implementation featured in this patch lays the groundwork for followup patches that will enable sorting to operate on serialized data pages that will be prepared by Spark SQL's new `unsafe` operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2621) Update task InputMetrics incrementally
[ https://issues.apache.org/jira/browse/SPARK-2621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14270714#comment-14270714 ] Rui Li commented on SPARK-2621: --- Hey [~sandyr], it seems after this change we require the input split to be a {{FileSplit}} to collect input metrics. Do you think it's possible to support {{CombineFileSplit}} as well? Thanks! Update task InputMetrics incrementally -- Key: SPARK-2621 URL: https://issues.apache.org/jira/browse/SPARK-2621 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Sandy Ryza Assignee: Sandy Ryza Fix For: 1.2.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-5080) Expose more cluster resource information to user
Rui Li created SPARK-5080: - Summary: Expose more cluster resource information to user Key: SPARK-5080 URL: https://issues.apache.org/jira/browse/SPARK-5080 Project: Spark Issue Type: Improvement Reporter: Rui Li It'll be useful if user can get detailed cluster resource info, e.g. granted/allocated executors, memory and CPU. Such information is available via WebUI but seems SparkContext doesn't have these APIs. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4921) Performance issue caused by TaskSetManager returning PROCESS_LOCAL for NO_PREF tasks
[ https://issues.apache.org/jira/browse/SPARK-4921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14256425#comment-14256425 ] Rui Li commented on SPARK-4921: --- I'm not sure if this is intended, but returning process_local for no_pref tasks may reset {{currentLocalityIndex}} to 0 which may cause more delay later. Seems there's a check to avoid this but I doubt it's sufficient: {code} // Update our locality level for delay scheduling // NO_PREF will not affect the variables related to delay scheduling if (maxLocality != TaskLocality.NO_PREF) { currentLocalityIndex = getLocalityIndex(taskLocality) lastLaunchTime = curTime } {code} Performance issue caused by TaskSetManager returning PROCESS_LOCAL for NO_PREF tasks - Key: SPARK-4921 URL: https://issues.apache.org/jira/browse/SPARK-4921 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Xuefu Zhang Attachments: NO_PREF.patch During research for HIVE-9153, we found that TaskSetManager returns PROCESS_LOCAL for NO_PREF tasks, which may caused performance degradation. Changing the return value to NO_PREF, as demonstrated in the attached patch, seemingly improves the performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2321) Design a proper progress reporting event listener API
[ https://issues.apache.org/jira/browse/SPARK-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14217485#comment-14217485 ] Rui Li commented on SPARK-2321: --- Hi [~joshrosen], Shall we make {{SparkJobInfo}} and {{SparkStageInfo}} serializable? Mainly for the case when spark context runs remotely. What's your opinion? Design a proper progress reporting event listener API --- Key: SPARK-2321 URL: https://issues.apache.org/jira/browse/SPARK-2321 Project: Spark Issue Type: Improvement Components: Java API, Spark Core Affects Versions: 1.0.0 Reporter: Reynold Xin Assignee: Josh Rosen Priority: Critical Fix For: 1.2.0 This is a ticket to track progress on redesigning the SparkListener and JobProgressListener API. There are multiple problems with the current design, including: 0. I'm not sure if the API is usable in Java (there are at least some enums we used in Scala and a bunch of case classes that might complicate things). 1. The whole API is marked as DeveloperApi, because we haven't paid a lot of attention to it yet. Something as important as progress reporting deserves a more stable API. 2. There is no easy way to connect jobs with stages. Similarly, there is no easy way to connect job groups with jobs / stages. 3. JobProgressListener itself has no encapsulation at all. States can be arbitrarily mutated by external programs. Variable names are sort of randomly decided and inconsistent. We should just revisit these and propose a new, concrete design. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4440) Enhance the job progress API to expose more information
Rui Li created SPARK-4440: - Summary: Enhance the job progress API to expose more information Key: SPARK-4440 URL: https://issues.apache.org/jira/browse/SPARK-4440 Project: Spark Issue Type: Improvement Reporter: Rui Li The progress API introduced in SPARK-2321 provides a new way for user to monitor job progress. However the information exposed in the API is relatively limited. It'll be much more useful if we can enhance the API to expose more data. Some improvement for example may include but not limited to: 1. Stage submission and completion time. 2. Task metrics. The requirement is initially identified for the hive on spark project(HIVE-7292), other application should benefit as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2321) Design a proper progress reporting event listener API
[ https://issues.apache.org/jira/browse/SPARK-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14214223#comment-14214223 ] Rui Li commented on SPARK-2321: --- Hey [~joshrosen], Thanks a lot for the update! I created SPARK-4440 for the enhancement. Design a proper progress reporting event listener API --- Key: SPARK-2321 URL: https://issues.apache.org/jira/browse/SPARK-2321 Project: Spark Issue Type: Improvement Components: Java API, Spark Core Affects Versions: 1.0.0 Reporter: Reynold Xin Assignee: Josh Rosen Priority: Critical Fix For: 1.2.0 This is a ticket to track progress on redesigning the SparkListener and JobProgressListener API. There are multiple problems with the current design, including: 0. I'm not sure if the API is usable in Java (there are at least some enums we used in Scala and a bunch of case classes that might complicate things). 1. The whole API is marked as DeveloperApi, because we haven't paid a lot of attention to it yet. Something as important as progress reporting deserves a more stable API. 2. There is no easy way to connect jobs with stages. Similarly, there is no easy way to connect job groups with jobs / stages. 3. JobProgressListener itself has no encapsulation at all. States can be arbitrarily mutated by external programs. Variable names are sort of randomly decided and inconsistent. We should just revisit these and propose a new, concrete design. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2321) Design a proper progress reporting event listener API
[ https://issues.apache.org/jira/browse/SPARK-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212000#comment-14212000 ] Rui Li commented on SPARK-2321: --- Hi [~joshrosen], The new API is quite useful. But the information exposed is relatively limited at the moment. Do you have any plan to enhance it? For example, submission and completion time is not available in {{SparkStageInfo}}, while they're provided in {{StageInfo}}. Thanks! Design a proper progress reporting event listener API --- Key: SPARK-2321 URL: https://issues.apache.org/jira/browse/SPARK-2321 Project: Spark Issue Type: Improvement Components: Java API, Spark Core Affects Versions: 1.0.0 Reporter: Reynold Xin Assignee: Josh Rosen Priority: Critical Fix For: 1.2.0 This is a ticket to track progress on redesigning the SparkListener and JobProgressListener API. There are multiple problems with the current design, including: 0. I'm not sure if the API is usable in Java (there are at least some enums we used in Scala and a bunch of case classes that might complicate things). 1. The whole API is marked as DeveloperApi, because we haven't paid a lot of attention to it yet. Something as important as progress reporting deserves a more stable API. 2. There is no easy way to connect jobs with stages. Similarly, there is no easy way to connect job groups with jobs / stages. 3. JobProgressListener itself has no encapsulation at all. States can be arbitrarily mutated by external programs. Variable names are sort of randomly decided and inconsistent. We should just revisit these and propose a new, concrete design. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2636) no where to get job identifier while submit spark job through spark API
[ https://issues.apache.org/jira/browse/SPARK-2636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14110172#comment-14110172 ] Rui Li commented on SPARK-2636: --- Just want to make sure I understand everything correctly: I think user submits a job via an RDD action, which in turn calls {{SparkContex.runJob - DAGScheduler.runJob - DAGScheduler.submitJob - DAGScheduler.handleJobSubmitted}}. The requirement is we should return some job ID to the user. So I think putting that in a DAGScheduler method doesn't help? BTW, {{DAGScheduler.submitJob}} returns a {{JobWaiter}} which contains the job ID. Also, by job ID, do we mean {{org.apache.spark.streaming.scheduler.Job.id}} or {{org.apache.spark.scheduler.ActiveJob.jobId}}? Please let me know if I misunderstand anything. no where to get job identifier while submit spark job through spark API --- Key: SPARK-2636 URL: https://issues.apache.org/jira/browse/SPARK-2636 Project: Spark Issue Type: New Feature Components: Java API Reporter: Chengxiang Li Labels: hive In Hive on Spark, we want to track spark job status through Spark API, the basic idea is as following: # create an hive-specified spark listener and register it to spark listener bus. # hive-specified spark listener generate job status by spark listener events. # hive driver track job status through hive-specified spark listener. the current problem is that hive driver need job identifier to track specified job status through spark listener, but there is no spark API to get job identifier(like job id) while submit spark job. I think other project whoever try to track job status with spark API would suffer from this as well. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-2740) In JavaPairRdd, allow user to specify ascending and numPartitions for sortByKey
Rui Li created SPARK-2740: - Summary: In JavaPairRdd, allow user to specify ascending and numPartitions for sortByKey Key: SPARK-2740 URL: https://issues.apache.org/jira/browse/SPARK-2740 Project: Spark Issue Type: Improvement Reporter: Rui Li Priority: Minor It should be more convenient if user can specify ascending and numPartitions when calling sortByKey. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2387) Remove the stage barrier for better resource utilization
[ https://issues.apache.org/jira/browse/SPARK-2387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14078798#comment-14078798 ] Rui Li commented on SPARK-2387: --- Right, thanks [~joshrosen] for pointing out. This is just some initial thoughts, anyway :) Remove the stage barrier for better resource utilization Key: SPARK-2387 URL: https://issues.apache.org/jira/browse/SPARK-2387 Project: Spark Issue Type: New Feature Components: Spark Core Reporter: Rui Li DAGScheduler divides a Spark job into multiple stages according to RDD dependencies. Whenever there’s a shuffle dependency, DAGScheduler creates a shuffle map stage on the map side, and another stage depending on that stage. Currently, the downstream stage cannot start until all its depended stages have finished. This barrier between stages leads to idle slots when waiting for the last few upstream tasks to finish and thus wasting cluster resources. Therefore we propose to remove the barrier and pre-start the reduce stage once there're free slots. This can achieve better resource utilization and improve the overall job performance, especially when there're lots of executors granted to the application. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2387) Remove the stage barrier for better resource utilization
[ https://issues.apache.org/jira/browse/SPARK-2387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14054862#comment-14054862 ] Rui Li commented on SPARK-2387: --- PR created at https://github.com/apache/spark/pull/1328 Remove the stage barrier for better resource utilization Key: SPARK-2387 URL: https://issues.apache.org/jira/browse/SPARK-2387 Project: Spark Issue Type: New Feature Components: Spark Core Reporter: Rui Li DAGScheduler divides a Spark job into multiple stages according to RDD dependencies. Whenever there’s a shuffle dependency, DAGScheduler creates a shuffle map stage on the map side, and another stage depending on that stage. Currently, the downstream stage cannot start until all its depended stages have finished. This barrier between stages leads to idle slots when waiting for the last few upstream tasks to finish and thus wasting cluster resources. Therefore we propose to remove the barrier and pre-start the reduce stage once there're free slots. This can achieve better resource utilization and improve the overall job performance, especially when there're lots of executors granted to the application. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-2387) Remove the stage barrier for better resource utilization
Rui Li created SPARK-2387: - Summary: Remove the stage barrier for better resource utilization Key: SPARK-2387 URL: https://issues.apache.org/jira/browse/SPARK-2387 Project: Spark Issue Type: New Feature Components: Spark Core Reporter: Rui Li DAGScheduler divides a Spark job into multiple stages according to RDD dependencies. Whenever there’s a shuffle dependency, DAGScheduler creates a shuffle map stage on the map side, and another stage depending on that stage. Currently, the downstream stage cannot start until all its depended stages have finished. This barrier between stages leads to idle slots when waiting for the last few upstream tasks to finish and thus wasting cluster resources. Therefore we propose to remove the barrier and pre-start the reduce stage once there're free slots. This can achieve better resource utilization and improve the overall job performance, especially when there're lots of executors granted to the application. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack
[ https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14052103#comment-14052103 ] Rui Li commented on SPARK-2277: --- With [PR #892|https://github.com/apache/spark/pull/892], we'll check if a task's preference is available when adding it to pending lists. TaskScheduler tracks information about executor/host, so that TaskSetManager can check if the preferred executor/host is available. TaskScheduler also provides getRackForHost to get the corresponding rack for a host (currently only returns None). I think this is some prior acquired knowledge about the cluster topology, which does not indicate whether there's any host on that rack granted to this spark app. Therefore we don't know the availability of the preferred rack. Make TaskScheduler track whether there's host on a rack --- Key: SPARK-2277 URL: https://issues.apache.org/jira/browse/SPARK-2277 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Rui Li When TaskSetManager adds a pending task, it checks whether the tasks's preferred location is available. Regarding RACK_LOCAL task, we consider the preferred rack available if such a rack is defined for the preferred host. This is incorrect as there may be no alive hosts on that rack at all. Therefore, TaskScheduler should track the hosts on each rack, and provides an API for TaskSetManager to check if there's host alive on a specific rack. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack
[ https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14050951#comment-14050951 ] Rui Li commented on SPARK-2277: --- Suppose task1 prefers node1 but node1 is not available at the moment. However, we know node1 is on rack1, which makes task1 prefers rack1 for RACK_LOCAL locality. The problem is, we don't know if there's alive host on rack1, so we cannot check the availability of this preference. Please let me know if I misunderstand anything :) Make TaskScheduler track whether there's host on a rack --- Key: SPARK-2277 URL: https://issues.apache.org/jira/browse/SPARK-2277 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Rui Li When TaskSetManager adds a pending task, it checks whether the tasks's preferred location is available. Regarding RACK_LOCAL task, we consider the preferred rack available if such a rack is defined for the preferred host. This is incorrect as there may be no alive hosts on that rack at all. Therefore, TaskScheduler should track the hosts on each rack, and provides an API for TaskSetManager to check if there's host alive on a specific rack. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack
[ https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14050952#comment-14050952 ] Rui Li commented on SPARK-2277: --- PR created at: https://github.com/apache/spark/pull/1212 Make TaskScheduler track whether there's host on a rack --- Key: SPARK-2277 URL: https://issues.apache.org/jira/browse/SPARK-2277 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Rui Li When TaskSetManager adds a pending task, it checks whether the tasks's preferred location is available. Regarding RACK_LOCAL task, we consider the preferred rack available if such a rack is defined for the preferred host. This is incorrect as there may be no alive hosts on that rack at all. Therefore, TaskScheduler should track the hosts on each rack, and provides an API for TaskSetManager to check if there's host alive on a specific rack. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-2277) Make TaskScheduler track whether there's host on a rack
Rui Li created SPARK-2277: - Summary: Make TaskScheduler track whether there's host on a rack Key: SPARK-2277 URL: https://issues.apache.org/jira/browse/SPARK-2277 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Rui Li When TaskSetManager adds a pending task, it checks whether the tasks's preferred location is available. Regarding RACK_LOCAL task, we consider the preferred rack available if such a rack is defined for the preferred host. This is incorrect as there may be no alive hosts on that rack at all. Therefore, TaskScheduler should track the hosts on each rack, and provides an API for TaskSetManager to check if there's host alive on a specific rack. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-1937) Tasks can be submitted before executors are registered
Rui Li created SPARK-1937: - Summary: Tasks can be submitted before executors are registered Key: SPARK-1937 URL: https://issues.apache.org/jira/browse/SPARK-1937 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0 Reporter: Rui Li During construction, TaskSetManager will assign tasks to several pending lists according to the tasks’ preferred locations. If the desired location is unavailable, it’ll then assign this task to “pendingTasksWithNoPrefs”, a list containing tasks without preferred locations. The problem is that tasks may be submitted before the executors get registered with the driver, in which case TaskSetManager will assign all the tasks to pendingTasksWithNoPrefs. Later when it looks for a task to schedule, it will pick one from this list and assign it to arbitrary executor, since TaskSetManager considers the tasks can run equally well on any node. This problem deprives benefits of data locality, drags the whole job slow and can cause imbalance between executors. I ran into this issue when running a spark program on a 7-node cluster (node6~node12). The program processes 100GB data. Since the data is uploaded to HDFS from node6, this node has a complete copy of the data and as a result, node6 finishes tasks much faster, which in turn makes it complete dis-proportionally more tasks than other nodes. To solve this issue, I think we shouldn't check availability of executors/hosts when constructing TaskSetManager. If a task prefers a node, we simply add the task to that node’s pending list. When later on the node is added, TaskSetManager can schedule the task according to proper locality level. If unfortunately the preferred node(s) never gets added, TaskSetManager can still schedule the task at locality level “ANY”. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (SPARK-1937) Tasks can be submitted before executors are registered
[ https://issues.apache.org/jira/browse/SPARK-1937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Li updated SPARK-1937: -- Attachment: Before-patch.png Tasks can be submitted before executors are registered -- Key: SPARK-1937 URL: https://issues.apache.org/jira/browse/SPARK-1937 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0 Reporter: Rui Li Attachments: After-patch.PNG, Before-patch.png During construction, TaskSetManager will assign tasks to several pending lists according to the tasks’ preferred locations. If the desired location is unavailable, it’ll then assign this task to “pendingTasksWithNoPrefs”, a list containing tasks without preferred locations. The problem is that tasks may be submitted before the executors get registered with the driver, in which case TaskSetManager will assign all the tasks to pendingTasksWithNoPrefs. Later when it looks for a task to schedule, it will pick one from this list and assign it to arbitrary executor, since TaskSetManager considers the tasks can run equally well on any node. This problem deprives benefits of data locality, drags the whole job slow and can cause imbalance between executors. I ran into this issue when running a spark program on a 7-node cluster (node6~node12). The program processes 100GB data. Since the data is uploaded to HDFS from node6, this node has a complete copy of the data and as a result, node6 finishes tasks much faster, which in turn makes it complete dis-proportionally more tasks than other nodes. To solve this issue, I think we shouldn't check availability of executors/hosts when constructing TaskSetManager. If a task prefers a node, we simply add the task to that node’s pending list. When later on the node is added, TaskSetManager can schedule the task according to proper locality level. If unfortunately the preferred node(s) never gets added, TaskSetManager can still schedule the task at locality level “ANY”. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (SPARK-1937) Tasks can be submitted before executors are registered
[ https://issues.apache.org/jira/browse/SPARK-1937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Li updated SPARK-1937: -- Attachment: RSBTest.scala The program that triggers the problem. With the patch, the whole execution time of the job reduces from nearly 600s to around 250s. Tasks can be submitted before executors are registered -- Key: SPARK-1937 URL: https://issues.apache.org/jira/browse/SPARK-1937 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0 Reporter: Rui Li Attachments: After-patch.PNG, Before-patch.png, RSBTest.scala During construction, TaskSetManager will assign tasks to several pending lists according to the tasks’ preferred locations. If the desired location is unavailable, it’ll then assign this task to “pendingTasksWithNoPrefs”, a list containing tasks without preferred locations. The problem is that tasks may be submitted before the executors get registered with the driver, in which case TaskSetManager will assign all the tasks to pendingTasksWithNoPrefs. Later when it looks for a task to schedule, it will pick one from this list and assign it to arbitrary executor, since TaskSetManager considers the tasks can run equally well on any node. This problem deprives benefits of data locality, drags the whole job slow and can cause imbalance between executors. I ran into this issue when running a spark program on a 7-node cluster (node6~node12). The program processes 100GB data. Since the data is uploaded to HDFS from node6, this node has a complete copy of the data and as a result, node6 finishes tasks much faster, which in turn makes it complete dis-proportionally more tasks than other nodes. To solve this issue, I think we shouldn't check availability of executors/hosts when constructing TaskSetManager. If a task prefers a node, we simply add the task to that node’s pending list. When later on the node is added, TaskSetManager can schedule the task according to proper locality level. If unfortunately the preferred node(s) never gets added, TaskSetManager can still schedule the task at locality level “ANY”. -- This message was sent by Atlassian JIRA (v6.2#6252)