[GitHub] spark pull request #16440: [SPARK-18857][SQL] Don't use `Iterator.duplicate`...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/16440#discussion_r94826910 --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala --- @@ -50,8 +50,8 @@ private[hive] class SparkExecuteStatementOperation( with Logging { private var result: DataFrame = _ + private var resultList: Option[Array[SparkRow]] = _ --- End diff -- Can we document these two fields, e.g. we cache the returned rows in resultList in case the user wants to use FETCH_FIRST. This is only used when incremental collect is set to false. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/16481#discussion_r94886105 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala --- @@ -88,17 +83,12 @@ class PartitionedTablePerfStatsSuite } private def setupPartitionedDatasourceTable( - tableName: String, dir: File, scale: Int, - clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = { + tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit = { spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write .partitionBy("partCol1", "partCol2") .mode("overwrite") .parquet(dir.getAbsolutePath) -if (clearMetricsBeforeCreate) { --- End diff -- Nice. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/16481#discussion_r94886317 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -494,8 +500,13 @@ case class DataSource( catalogTable = catalogTable, fileIndex = fileIndex) sparkSession.sessionState.executePlan(plan).toRdd -// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it. -copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() +if (isForWriteOnly) { + // Exit earlier and return null + null --- End diff -- Maybe we can change it to return an option? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16460: [SPARK-19058][SQL] fix partition related behavior...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/16460#discussion_r94456738 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -473,22 +473,26 @@ case class DataSource( s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]") }.asInstanceOf[Attribute] } +val fileIndex = catalogTable.map(_.identifier).map { tableIdent => + sparkSession.table(tableIdent).queryExecution.analyzed.collect { +case LogicalRelation(t: HadoopFsRelation, _, _) => t.location + }.head +} // For partitioned relation r, r.schema's column ordering can be different from the column // ordering of data.logicalPlan (partition columns are all moved after data column). This // will be adjusted within InsertIntoHadoopFsRelation. val plan = InsertIntoHadoopFsRelationCommand( outputPath = outputPath, staticPartitions = Map.empty, -customPartitionLocations = Map.empty, partitionColumns = columns, bucketSpec = bucketSpec, fileFormat = format, -refreshFunction = _ => Unit, // No existing table needs to be refreshed. options = options, query = data.logicalPlan, mode = mode, -catalogTable = catalogTable) +catalogTable = catalogTable, +fileIndex = fileIndex) --- End diff -- Should we be issuing refresh table instead of refreshing the index directly? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16460: [SPARK-19058][SQL] fix partition related behavior...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/16460#discussion_r94457115 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -393,7 +393,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { // Drop the existing table catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false) -createTable(tableIdent) +createTable(tableIdentWithDB) +// Refresh the cache of the table in the catalog. +catalog.refreshTable(tableIdentWithDB) --- End diff -- Is this already done by the insertion command? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16460: [SPARK-19058][SQL] fix partition related behaviors with ...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/16460 looks good --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16460: [SPARK-19058][SQL] fix partition related behavior...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/16460#discussion_r94542321 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala --- @@ -74,12 +69,29 @@ case class InsertIntoHadoopFsRelationCommand( val fs = outputPath.getFileSystem(hadoopConf) val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) +val partitionsTrackedByCatalog = catalogTable.isDefined && + catalogTable.get.partitionColumnNames.nonEmpty && + catalogTable.get.tracksPartitionsInCatalog --- End diff -- yeah, we should revert to 2.0 behavior as if querying the table from 2.0 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/15539#discussion_r95074015 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean + +import scala.collection.JavaConverters._ + +import com.google.common.cache._ +import org.apache.hadoop.fs.{FileStatus, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{SerializableConfiguration, SizeEstimator} + +/** + * A cache of the leaf files of partition directories. We cache these files in order to speed + * up iterated queries over the same set of partitions. Otherwise, each query would have to + * hit remote storage in order to gather file statistics for physical planning. + * + * Each resolved catalog table has its own FileStatusCache. When the backing relation for the + * table is refreshed via refreshTable() or refreshByPath(), this cache will be invalidated. + */ +abstract class FileStatusCache { + /** + * @return the leaf files for the specified path from this cache, or None if not cached. + */ + def getLeafFiles(path: Path): Option[Array[FileStatus]] = None + + /** + * Saves the given set of leaf files for a path in this cache. + */ + def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit + + /** + * Invalidates all data held by this cache. + */ + def invalidateAll(): Unit +} + +object FileStatusCache { + private var sharedCache: SharedInMemoryCache = null + + /** + * @return a new FileStatusCache based on session configuration. Cache memory quota is + * shared across all clients. + */ + def newCache(session: SparkSession): FileStatusCache = { +synchronized { + if (session.sqlContext.conf.filesourcePartitionPruning && + session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) { --- End diff -- Oh I see, the sizing can be problematic. I find it sometimes useful for debugging to turn it off at runtime by setting it to zero though -- it will take effect after a refresh table. Perhaps we can just add a comment that adjusting the value at runtime may produce unexpected behavior when there are multiple sessions? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/15539#discussion_r95073488 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean + +import scala.collection.JavaConverters._ + +import com.google.common.cache._ +import org.apache.hadoop.fs.{FileStatus, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{SerializableConfiguration, SizeEstimator} + +/** + * A cache of the leaf files of partition directories. We cache these files in order to speed + * up iterated queries over the same set of partitions. Otherwise, each query would have to + * hit remote storage in order to gather file statistics for physical planning. + * + * Each resolved catalog table has its own FileStatusCache. When the backing relation for the + * table is refreshed via refreshTable() or refreshByPath(), this cache will be invalidated. + */ +abstract class FileStatusCache { + /** + * @return the leaf files for the specified path from this cache, or None if not cached. + */ + def getLeafFiles(path: Path): Option[Array[FileStatus]] = None + + /** + * Saves the given set of leaf files for a path in this cache. + */ + def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit + + /** + * Invalidates all data held by this cache. + */ + def invalidateAll(): Unit +} + +object FileStatusCache { + private var sharedCache: SharedInMemoryCache = null + + /** + * @return a new FileStatusCache based on session configuration. Cache memory quota is + * shared across all clients. + */ + def newCache(session: SparkSession): FileStatusCache = { +synchronized { + if (session.sqlContext.conf.filesourcePartitionPruning && + session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) { --- End diff -- Sounds good, as long as you can still change it at runtime. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16424: [SPARK-19016][SQL][DOC] Document scalable partiti...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/16424#discussion_r94170820 --- Diff: docs/sql-programming-guide.md --- @@ -526,11 +526,18 @@ By default `saveAsTable` will create a "managed table", meaning that the locatio be controlled by the metastore. Managed tables will also have their data deleted automatically when a table is dropped. -Currently, `saveAsTable` does not expose an API supporting the creation of an "External table" from a `DataFrame`, -however, this functionality can be achieved by providing a `path` option to the `DataFrameWriter` with `path` as the key -and location of the external table as its value (String) when saving the table with `saveAsTable`. When an External table +Currently, `saveAsTable` does not expose an API supporting the creation of an "external table" from a `DataFrame`, +however. This functionality can be achieved by providing a `path` option to the `DataFrameWriter` with `path` as the key +and location of the external table as its value (a string) when saving the table with `saveAsTable`. When an External table is dropped only its metadata is removed. +Starting from Spark 2.1, persistent datasource tables have per-partition metadata stored in the Hive metastore. This brings several benefits: + +- Since full information of all partitions can be retrieved from metastore, excessive partition discovery is no longer needed. This greatly saves query planning time for partitioned tables with a large number of partitions. --- End diff -- nit: Since the metastore can return only necessary partitions for a query, discovering all the partitions on the first query to the table is no longer needed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16424: [SPARK-19016][SQL][DOC] Document scalable partiti...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/16424#discussion_r94170722 --- Diff: docs/sql-programming-guide.md --- @@ -515,7 +515,7 @@ new data. ### Saving to Persistent Tables `DataFrames` can also be saved as persistent tables into Hive metastore using the `saveAsTable` -command. Notice existing Hive deployment is not necessary to use this feature. Spark will create a +command. Notice that existing Hive deployment is not necessary to use this feature. Spark will create a --- End diff -- nit: **an** existing Hive deployment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16424: [SPARK-19016][SQL][DOC] Document scalable partiti...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/16424#discussion_r94170967 --- Diff: docs/sql-programming-guide.md --- @@ -526,11 +526,18 @@ By default `saveAsTable` will create a "managed table", meaning that the locatio be controlled by the metastore. Managed tables will also have their data deleted automatically when a table is dropped. -Currently, `saveAsTable` does not expose an API supporting the creation of an "External table" from a `DataFrame`, -however, this functionality can be achieved by providing a `path` option to the `DataFrameWriter` with `path` as the key -and location of the external table as its value (String) when saving the table with `saveAsTable`. When an External table +Currently, `saveAsTable` does not expose an API supporting the creation of an "external table" from a `DataFrame`, +however. This functionality can be achieved by providing a `path` option to the `DataFrameWriter` with `path` as the key +and location of the external table as its value (a string) when saving the table with `saveAsTable`. When an External table is dropped only its metadata is removed. +Starting from Spark 2.1, persistent datasource tables have per-partition metadata stored in the Hive metastore. This brings several benefits: + +- Since full information of all partitions can be retrieved from metastore, excessive partition discovery is no longer needed. This greatly saves query planning time for partitioned tables with a large number of partitions. +- Hive DDLs such as `ALTER TABLE PARTITION ... SET LOCATION` are now available for tables created with the Datasource API. + +Note that partition information is not gathered by default when creating an external datasource tables (those with a `path` option). You may want to invoke `MSCK REPAIR TABLE` to trigger partition discovery and persist per-partition information into metastore before querying a created external table. --- End diff -- s/an external/external To sync the partition information in the metastore, you can invoke `MSCK REPAIR TABLE`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16424: [SPARK-19016][SQL][DOC] Document scalable partiti...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/16424#discussion_r94187848 --- Diff: docs/sql-programming-guide.md --- @@ -526,11 +526,18 @@ By default `saveAsTable` will create a "managed table", meaning that the locatio be controlled by the metastore. Managed tables will also have their data deleted automatically when a table is dropped. -Currently, `saveAsTable` does not expose an API supporting the creation of an "External table" from a `DataFrame`, -however, this functionality can be achieved by providing a `path` option to the `DataFrameWriter` with `path` as the key -and location of the external table as its value (String) when saving the table with `saveAsTable`. When an External table +Currently, `saveAsTable` does not expose an API supporting the creation of an "external table" from a `DataFrame`, +however. This functionality can be achieved by providing a `path` option to the `DataFrameWriter` with `path` as the key --- End diff -- Should it say: However, this functionality? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16424: [SPARK-19016][SQL][DOC] Document scalable partition hand...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/16424 LGTM, just one comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16460: [SPARK-19058][SQL] fix partition related behavior...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/16460#discussion_r94536423 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala --- @@ -74,12 +69,29 @@ case class InsertIntoHadoopFsRelationCommand( val fs = outputPath.getFileSystem(hadoopConf) val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) +val partitionsTrackedByCatalog = catalogTable.isDefined && + catalogTable.get.partitionColumnNames.nonEmpty && + catalogTable.get.tracksPartitionsInCatalog --- End diff -- Hm, in other parts of the code we assume that the feature is completely disabled when the flag is off. This is probably needed since there is no way to revert a table otherwise. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15539: [SPARK-17994] [SQL] Add back a file status cache for cat...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/15539 That one is safe to make global but mutable right? It will take effect after a table is refreshed. Most of these anomalies seem OK to me provided we document them -- it seems to solve them we would have to cache per session which would significantly increase memory required. On Sun, Jan 8, 2017, 9:32 AM Xiao Li <notificati...@github.com> wrote: > *@gatorsmile* commented on this pull request. > -- > > In > sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala > <https://github.com/apache/spark/pull/15539>: > > > + * Invalidates all data held by this cache. > + */ > + def invalidateAll(): Unit > +} > + > +object FileStatusCache { > + private var sharedCache: SharedInMemoryCache = null > + > + /** > + * @return a new FileStatusCache based on session configuration. Cache memory quota is > + * shared across all clients. > + */ > + def newCache(session: SparkSession): FileStatusCache = { > +synchronized { > + if (session.sqlContext.conf.filesourcePartitionPruning && > + session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) { > > How about HIVE_MANAGE_FILESOURCE_PARTITIONS? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/15539>, or mute the thread > <https://github.com/notifications/unsubscribe-auth/AAA6SkueCftKjbFVS9-G790WmFj-NA1Gks5rQR24gaJpZM4KabFw> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15539: [SPARK-17994] [SQL] Add back a file status cache for cat...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/15539 Hm, what use cases are we trying to address? As I understand, the worst that can happen if the cache size flag is toggled at runtime is that the old settings might still apply. And when the filesource partition management flag is changed, it only takes effect after the table entry in `cachedDataSourceTables` in HiveMetastoreCatalog is refreshed. If we document the above, is that enough? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15539: [SPARK-17994] [SQL] Add back a file status cache for cat...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/15539 Hmm, I don't think fileStatusCache can ever return incorrect results, only stale ones. Furthermore, its scoped by client-id to particular instances of tables, so refresh table is guaranteed to wipe out cache state. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16514: [SPARK-19128] [SQL] Refresh Cache after Set Location
Github user ericl commented on the issue: https://github.com/apache/spark/pull/16514 Do you know why this check in the relation cache that the root paths have not changed is not sufficient? https://github.com/apache/spark/blob/24482858e05bea84cacb41c62be0a9aaa33897ee/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L160 Seems SET LOCATION should cause that to change, causing the next cache get to invalidate the relation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16514: [SPARK-19128] [SQL] Refresh Cache after Set Location
Github user ericl commented on the issue: https://github.com/apache/spark/pull/16514 I see. What do you think about adding that check in the caching code rather than require invalidation calls? After all, the SET LOCATION may be issued by a separate Spark cluster connecting to the same hive metastore, so you'd need to invalidate all the cache of all clients talking to the metastore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16350: [SPARK-18700][SQL][BACKPORT-2.0] Add StripedLock for eac...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/16350 yeah, i don't think we need the unit test for 2.0 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16326: [SPARK-18915] [SQL] Automatic Table Repair when Creating...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/16326 Oh I see, you're saying if there are old files for the partition, the INSERT INTO will cause those to become visible. That is a little confusing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16341: [SQL] [WIP] Switch internal catalog types to use ...
GitHub user ericl opened a pull request: https://github.com/apache/spark/pull/16341 [SQL] [WIP] Switch internal catalog types to use URI instead of string for locationUri ## What changes were proposed in this pull request? This should help prevent accidental incorrect conversions between Path <-> URI. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ericl/spark fix-uris Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16341.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16341 commit 6e0cc041dfde21fd500f6b9a40dd5245d76334f7 Author: Eric Liang <e...@databricks.com> Date: 2016-12-19T22:19:34Z Mon Dec 19 14:19:34 PST 2016 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16326: [SPARK-18915] [SQL] Automatic Table Repair when Creating...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/16326 > hive> select * from test; >OK >ddda >c a Isn't this showing that hive is appending to the table (ddd, a) as expected with INSERT INTO? For the (213, 0) example, is that just a bug? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16122: [SPARK-18681][SQL] Fix filtering to compatible with part...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/16122 I see. In that case I think manual testing may be sufficient. On Wed, Dec 7, 2016, 5:00 PM Michael Allman <notificati...@github.com> wrote: > I think that's exactly what I tried and got the `NoSuchMethodException`. > > On Dec 7, 2016, at 3:35 PM, Eric Liang <notificati...@github.com> wrote: > > I did some digging into HiveClientImpl and Hive.java, and I think it would > be pretty safe to call Hive.get() within the scope of a > client.withHiveState > { }. That call sets the thread-local hive to that used by the client before > executing the passed function block. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/16122#issuecomment-265608215>, or > mute > the thread > < > https://github.com/notifications/unsubscribe-auth/AAy4nWy8PPhc-6MWv2mGUoPXws_WxLJ0ks5rF0LagaJpZM4LC1rJ > > > . > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/16122#issuecomment-265622317>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAA6SlhtuKG0rqwcBxdNu4_PSYjG1NkXks5rF1a_gaJpZM4LC1rJ> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16135: [SPARK-18700][SQL] Add ReadWriteLock for each table's re...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/16135 Isn't it sufficient to lock around the `catalog.filterPartitions(Nil)`? Why do we need reader locks? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r10651 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -467,7 +474,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) -if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { +if (!taskSetManager.isZombie) { --- End diff -- There might be a simple solution here to avoid extra overhead on speculative tasks. We just need to check if the task index has been marked as successful -- if so, we can skip calling reviveOffers(). How does this look? ``` -if (!taskSetManager.isZombie) { +if (!taskSetManager.isZombie && !taskSetManager.someAttemptSucceeded(tid)) { ``` Then in TaskSetManager, ``` + def someAttemptSucceeded(tid: Long): Boolean = { +successful(taskInfos(tid).index) + } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107780463 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -467,7 +474,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) -if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { +if (!taskSetManager.isZombie) { --- End diff -- Great, I updated the PR to include this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17166: [SPARK-19820] [core] Allow reason to be specified for ta...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/17166 jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107534830 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -467,7 +474,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) -if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { +if (!taskSetManager.isZombie) { --- End diff -- I don't think you can get a request storm. The _only_ case this guards against is exactly what you mentioned -- a speculative task that is killed because another attempt succeeded. The number of speculative tasks is always small so this shouldn't be an issue. In comparison, we hit this code path much more often with failed tasks. If we were to add this check back, then in the task kill API we would have to add a parameter as to whether revive offers should be called. This is substantial added complexity which can be removed if we make this simplification at this site. I'll leave it to @kayousterhout to decide if this is worth it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107559290 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -296,12 +298,13 @@ private[spark] class Executor( // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. -if (killed) { +val killReason = reasonIfKilled --- End diff -- If we assign to a temporary, then there is no risk of seeing concurrent mutations of the value as we access it below (though, this cannot currently happen). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107559342 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -239,14 +239,26 @@ private[spark] class TaskSchedulerImpl private[scheduler]( //simply abort the stage. tsm.runningTasksSet.foreach { tid => val execId = taskIdToExecutorId(tid) - backend.killTask(tid, execId, interruptThread) + backend.killTask(tid, execId, interruptThread, reason = "stage cancelled") } tsm.abort("Stage %s cancelled".format(stageId)) logInfo("Stage %d was cancelled".format(stageId)) } } } + override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean = { +logInfo(s"Killing task $taskId: $reason") +val execId = taskIdToExecutorId.get(taskId) +if (execId.isDefined) { + backend.killTask(taskId, execId.get, interruptThread, reason) + true +} else { + logInfo(s"Could not kill task $taskId because no task with that ID was found.") --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107542763 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -160,15 +160,20 @@ private[spark] abstract class Task[T]( // A flag to indicate whether the task is killed. This is used in case context is not yet // initialized when kill() is invoked. - @volatile @transient private var _killed = false + @volatile @transient private var _maybeKillReason: String = null protected var _executorDeserializeTime: Long = 0 protected var _executorDeserializeCpuTime: Long = 0 /** * Whether the task has been killed. */ - def killed: Boolean = _killed + def killed: Boolean = _maybeKillReason != null + + /** + * If this task has been killed, contains the reason for the kill. --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107542944 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -239,14 +239,21 @@ private[spark] class TaskSchedulerImpl private[scheduler]( //simply abort the stage. tsm.runningTasksSet.foreach { tid => val execId = taskIdToExecutorId(tid) - backend.killTask(tid, execId, interruptThread) + backend.killTask(tid, execId, interruptThread, reason = "stage cancelled") } tsm.abort("Stage %s cancelled".format(stageId)) logInfo("Stage %d was cancelled".format(stageId)) } } } + override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = { +logInfo(s"Killing task ($reason): $taskId") --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107543449 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -239,14 +239,21 @@ private[spark] class TaskSchedulerImpl private[scheduler]( //simply abort the stage. tsm.runningTasksSet.foreach { tid => val execId = taskIdToExecutorId(tid) - backend.killTask(tid, execId, interruptThread) + backend.killTask(tid, execId, interruptThread, reason = "stage cancelled") } tsm.abort("Stage %s cancelled".format(stageId)) logInfo("Stage %d was cancelled".format(stageId)) } } } + override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = { +logInfo(s"Killing task ($reason): $taskId") +val execId = taskIdToExecutorId.getOrElse( + taskId, throw new IllegalArgumentException("Task not found: " + taskId)) --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107541631 --- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala --- @@ -59,8 +59,8 @@ private[spark] class TaskContextImpl( /** List of callback functions to execute when the task fails. */ @transient private val onFailureCallbacks = new ArrayBuffer[TaskFailureListener] - // Whether the corresponding task has been killed. - @volatile private var interrupted: Boolean = false + // If defined, the corresponding task has been killed for the contained reason. + @volatile private var maybeKillReason: Option[String] = None --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107542020 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala --- @@ -215,7 +215,7 @@ private[spark] class PythonRunner( case e: Exception if context.isInterrupted => logDebug("Exception thrown after task interruption", e) -throw new TaskKilledException +throw new TaskKilledException(context.getKillReason().getOrElse("unknown reason")) --- End diff -- @mridulm pointed out that should the kill reason get reset to None by a concurrent thread, this would crash. However, it is true that this can't happen in the current implementation. If you think it's clearer, we could throw an AssertionError in this case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107542655 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -160,15 +160,20 @@ private[spark] abstract class Task[T]( // A flag to indicate whether the task is killed. This is used in case context is not yet // initialized when kill() is invoked. - @volatile @transient private var _killed = false + @volatile @transient private var _maybeKillReason: String = null --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17475: [SPARK-20148] [SQL] Extend the file commit API to...
GitHub user ericl opened a pull request: https://github.com/apache/spark/pull/17475 [SPARK-20148] [SQL] Extend the file commit API to allow subscribing to task commit messages ## What changes were proposed in this pull request? The internal FileCommitProtocol interface returns all task commit messages in bulk to the implementation when a job finishes. However, it is sometimes useful to access those messages before the job completes, so that the driver gets incremental progress updates before the job finishes. This adds an `onTaskCommit` listener to the internal api. ## How was this patch tested? Unit tests. cc @rxin You can merge this pull request into a Git repository by running: $ git pull https://github.com/ericl/spark file-commit-api-ext Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17475.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17475 commit a541fdd34d71656c6932eadb3edad9b782a1ae22 Author: Eric Liang <e...@databricks.com> Date: 2017-03-29T23:16:40Z initial commit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17166: [SPARK-19820] [core] Allow reason to be specified for ta...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/17166 Rebased --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17166: [SPARK-19820] [core] Allow reason to be specified for ta...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/17166 jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107070379 --- Diff: core/src/main/scala/org/apache/spark/ui/UIUtils.scala --- @@ -354,7 +354,7 @@ private[spark] object UIUtils extends Logging { {completed}/{total} { if (failed > 0) s"($failed failed)" } { if (skipped > 0) s"($skipped skipped)" } -{ if (killed > 0) s"($killed killed)" } +{ reasonToNumKilled.map { case (reason, count) => s"($count killed: $reason)" } } --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107070457 --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala --- @@ -540,6 +540,39 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } + // Launches one task that will run forever. Once the SparkListener detects the task has + // started, kill and re-schedule it. The second run of the task will complete immediately. + // If this test times out, then the first version of the task wasn't killed successfully. + test("Killing tasks") { +sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + +SparkContextSuite.isTaskStarted = false +SparkContextSuite.taskKilled = false + +val listener = new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +eventually(timeout(10.seconds)) { + assert(SparkContextSuite.isTaskStarted) +} +if (!SparkContextSuite.taskKilled) { + SparkContextSuite.taskKilled = true + sc.killTaskAttempt(taskStart.taskInfo.taskId, true, "first attempt will hang") +} + } --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107070597 --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala --- @@ -540,6 +540,39 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } + // Launches one task that will run forever. Once the SparkListener detects the task has + // started, kill and re-schedule it. The second run of the task will complete immediately. + // If this test times out, then the first version of the task wasn't killed successfully. + test("Killing tasks") { +sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + +SparkContextSuite.isTaskStarted = false +SparkContextSuite.taskKilled = false + +val listener = new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +eventually(timeout(10.seconds)) { + assert(SparkContextSuite.isTaskStarted) +} +if (!SparkContextSuite.taskKilled) { + SparkContextSuite.taskKilled = true + sc.killTaskAttempt(taskStart.taskInfo.taskId, true, "first attempt will hang") +} + } +} +sc.addSparkListener(listener) +eventually(timeout(20.seconds)) { + sc.parallelize(1 to 1).foreach { x => +// first attempt will hang +if (!SparkContextSuite.isTaskStarted) { + SparkContextSuite.isTaskStarted = true + Thread.sleep(999) +} +// second attempt succeeds immediately + } --- End diff -- `foreach` is an action so actually it does run, but I added the verification just in case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107070165 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -467,7 +474,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) -if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { +if (!taskSetManager.isZombie) { --- End diff -- This shouldn't be a behavior change, just a simplification of the logic to always call reviveOffers(). Whether the task is rescheduled or not is decided independently of this call -- it's just nice to call reviveOffers always so the task won't end up pending until the next round of offers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107073269 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -212,8 +212,8 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case object TaskKilled extends TaskFailedReason { - override def toErrorString: String = "TaskKilled (killed intentionally)" +case class TaskKilled(reason: String) extends TaskFailedReason { + override def toErrorString: String = s"TaskKilled ($reason)" --- End diff -- This is unfortunately not backwards compatible. I've looked into this, but the issue seems to be that case objects are not equal to any case class in scala. If `TaskKilled` was a case class to start with, compatibility might have been possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16341: [SQL] [WIP] Switch internal catalog types to use ...
Github user ericl closed the pull request at: https://github.com/apache/spark/pull/16341 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17166: [SPARK-19820] [core] Allow reason to be specified for ta...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/17166 Test failure seems unrelated. jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107274054 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -467,7 +474,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) -if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { +if (!taskSetManager.isZombie) { --- End diff -- There is no need, but reviving offers has no effect either way. Those tasks will not be resubmitted even if reviveOffers() is called (in fact, reviveOffers() is called periodically on a timer thread, so if this was an issue we should have already seen it). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107271262 --- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala --- @@ -140,16 +140,22 @@ private[spark] class TaskContextImpl( } /** Marks the task for interruption, i.e. cancellation. */ - private[spark] def markInterrupted(): Unit = { -interrupted = true + private[spark] def markInterrupted(reason: String): Unit = { +maybeKillReason = Some(reason) + } + + private[spark] override def killTaskIfInterrupted(): Unit = { +if (maybeKillReason.isDefined) { + throw new TaskKilledException(maybeKillReason.get) --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107273498 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -160,15 +160,20 @@ private[spark] abstract class Task[T]( // A flag to indicate whether the task is killed. This is used in case context is not yet // initialized when kill() is invoked. - @volatile @transient private var _killed = false + @volatile @transient private var _maybeKillReason: String = null --- End diff -- This one gets deserialized to null sometimes, so it seemed cleaner to use a bare string. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107273296 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -302,12 +298,12 @@ private[spark] class Executor( // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. -if (killed) { +if (maybeKillReason.isDefined) { // Throw an exception rather than returning, because returning within a try{} block // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl // exception will be caught by the catch block, leading to an incorrect ExceptionFailure // for the task. - throw new TaskKilledException + throw new TaskKilledException(maybeKillReason.get) --- End diff -- Fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107272852 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala --- @@ -215,7 +215,8 @@ private[spark] class PythonRunner( case e: Exception if context.isInterrupted => logDebug("Exception thrown after task interruption", e) -throw new TaskKilledException +context.killTaskIfInterrupted() +null // not reached --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107271185 --- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala --- @@ -59,8 +59,8 @@ private[spark] class TaskContextImpl( /** List of callback functions to execute when the task fails. */ @transient private val onFailureCallbacks = new ArrayBuffer[TaskFailureListener] - // Whether the corresponding task has been killed. - @volatile private var interrupted: Boolean = false + // If defined, the corresponding task has been killed for the contained reason. + @volatile private var maybeKillReason: Option[String] = None --- End diff -- Yeah, the reason here is to allow this to be set atomically. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r106066177 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -710,7 +710,11 @@ private[spark] class TaskSetManager( logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") - sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true) + sched.backend.killTask( +attemptInfo.taskId, +attemptInfo.executorId, +interruptThread = true, +reason = "another attempt succeeded") --- End diff -- I added two screenshots to the PR description. In the second scenario having a verbose reason is fine, but in the stage summary view long or many distinct reasons would overflow the progress bar. We could probably fix the css to allow slightly longer / more reasons, but even that wouldn't be great if each task had a different reason. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r106051633 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -2250,6 +2250,25 @@ class SparkContext(config: SparkConf) extends Logging { } /** + * Kill a given task. It will be retried. + * + * @param taskId the task ID to kill + */ + def killTask(taskId: Long): Unit = { +killTask(taskId, "cancelled") --- End diff -- The only issue here is that the UI is not great at rendering long strings (it tends to cut them off). I'd prefer to keep it something concise for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r106051490 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -2250,6 +2250,25 @@ class SparkContext(config: SparkConf) extends Logging { } /** + * Kill a given task. It will be retried. --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r106051697 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -2250,6 +2250,25 @@ class SparkContext(config: SparkConf) extends Logging { } /** + * Kill a given task. It will be retried. + * + * @param taskId the task ID to kill + */ + def killTask(taskId: Long): Unit = { +killTask(taskId, "cancelled") + } + + /** + * Kill a given task. It will be retried. + * + * @param taskId the task ID to kill + * @param reason the reason for killing the task, which should be a short string + */ + def killTask(taskId: Long, reason: String): Unit = { --- End diff -- > What is the expectation when a task is being killed. > Is it specifically for the task being referenced; or all attempts of the task ? The current task attempt (which is uniquely identifier by the task id). I updated the docs as suggested here. > "killAndRescheduleTask" implies it will be rescheduled - which might not occur in case this was a speculative task (or already completed) : would be good to clarify. Went with killTaskAttempt. > Is this expected to be exposed via the UI ? > How is it to be leveraged (if not via UI) ? For now, you can look at the Spark UI, find the task ID, and call killTaskAttempt on it. It would be nice to have this as a button on the executor page in a follow-up. You can also have a listener that kills tasks as suggested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r106060305 --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala --- @@ -538,10 +538,37 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } + test("Killing tasks") { +sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + +SparkContextSuite.isTaskStarted = false +SparkContextSuite.taskKilled = false + --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r106054370 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala --- @@ -64,7 +64,7 @@ private[spark] object UIData { var numCompletedTasks: Int = 0, var numSkippedTasks: Int = 0, var numFailedTasks: Int = 0, -var numKilledTasks: Int = 0, +var numKilledTasks: Map[String, Int] = Map.empty, --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r106060636 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala --- @@ -104,7 +104,8 @@ private[spark] class MesosExecutorBackend logError("Received KillTask but executor was null") } else { // TODO: Determine the 'interruptOnCancel' property set for the given job. - executor.killTask(t.getValue.toLong, interruptThread = false) + executor.killTask( +t.getValue.toLong, interruptThread = false, reason = "killed intentionally") --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r106053942 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala --- @@ -54,6 +54,9 @@ private[spark] trait TaskScheduler { // Cancel a stage. def cancelTasks(stageId: Int, interruptThread: Boolean): Unit + // Kill a task. --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r106053125 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -239,8 +244,9 @@ private[spark] class Executor( */ @volatile var task: Task[Any] = _ -def kill(interruptThread: Boolean): Unit = { - logInfo(s"Executor is trying to kill $taskName (TID $taskId)") +def kill(interruptThread: Boolean, reason: String): Unit = { + logInfo(s"Executor is trying to kill $taskName (TID $taskId), reason: $reason") --- End diff -- Which paren do you mean here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r106054145 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -710,7 +710,11 @@ private[spark] class TaskSetManager( logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") - sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true) + sched.backend.killTask( +attemptInfo.taskId, +attemptInfo.executorId, +interruptThread = true, +reason = "another attempt succeeded") --- End diff -- As above, this would cause the progress bar to overflow in the UI. I think we should stick to short strings for now -- if users find this particularly useful we can add a long form reason in another PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r106052824 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -168,7 +168,8 @@ private[spark] class Executor( case Some(existingReaper) => interruptThread && !existingReaper.interruptThread } if (shouldCreateReaper) { -val taskReaper = new TaskReaper(taskRunner, interruptThread = interruptThread) +val taskReaper = new TaskReaper( + taskRunner, interruptThread = interruptThread, reason = reason) --- End diff -- I think it's reasonable to show one (arbitrary) reason since this should be a rare situation. Also updated the killTaskAttempt doc comment to reflect this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r106060313 --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala --- @@ -538,10 +538,37 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } + test("Killing tasks") { +sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + +SparkContextSuite.isTaskStarted = false +SparkContextSuite.taskKilled = false + +val listener = new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +eventually(timeout(10.seconds)) { + assert(SparkContextSuite.isTaskStarted) +} +if (!SparkContextSuite.taskKilled) { + SparkContextSuite.taskKilled = true + sc.killTask(taskStart.taskInfo.taskId, "first attempt will hang") +} + } +} +sc.addSparkListener(listener) +sc.parallelize(1 to 1).foreach { x => --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r106053726 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -732,6 +732,13 @@ class DAGScheduler( } /** + * Kill a given task. It will be retried. + */ + def killTask(taskId: Long, reason: String): Unit = { +taskScheduler.killTask(taskId, true, reason) + } --- End diff -- I added this as a param to the public API, defaulting to true. It might be nice to pull the default from the job properties, but I didn't see a clean way to do this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17166: [SPARK-19820] [core] Allow reason to be specified for ta...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/17166 Drilling down into the detail view is kind of cumbersome -- I think it's most useful to have a good summary at the progress bar, and then the user can refer to logs for detailed per-task debugging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r106555729 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -2250,6 +2250,22 @@ class SparkContext(config: SparkConf) extends Logging { } /** + * Kill and reschedule the given task attempt. Task ids can be obtained from the Spark UI + * or through SparkListener.onTaskStart. + * + * @param taskId the task ID to kill. This id uniquely identifies the task attempt. + * @param interruptThread whether to interrupt the thread running the task. + * @param reason the reason for killing the task, which should be a short string. If a task + * is killed multiple times with different reasons, only one reason will be reported. + */ + def killTaskAttempt( + taskId: Long, + interruptThread: Boolean = true, + reason: String = "cancelled"): Unit = { --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17166: [SPARK-19820] [core] Allow reason to be specified for ta...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/17166 Made the change to improve the default reason, which now says "killed via SparkContext.killTaskAttempt". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17531: [SPARK-20217][core] Executor should not fail stag...
GitHub user ericl opened a pull request: https://github.com/apache/spark/pull/17531 [SPARK-20217][core] Executor should not fail stage if killed task throws non-interrupted exception ## What changes were proposed in this pull request? If tasks throw non-interrupted exceptions on kill (e.g. java.nio.channels.ClosedByInterruptException), their death is reported back as TaskFailed instead of TaskKilled. This causes stage failure in some cases. This is reproducible as follows. Run the following, and then use SparkContext.killTaskAttempt to kill one of the tasks. The entire stage will fail since we threw a RuntimeException instead of InterruptedException. We should probably unconditionally return TaskKilled instead of TaskFailed if the task was killed by the driver, regardless of the actual exception thrown. ``` spark.range(100).repartition(100).foreach { i => try { Thread.sleep(1000) } catch { case t: InterruptedException => throw new RuntimeException(t) } } ``` Based on the code in TaskSetManager, I think this also affects kills of speculative tasks. However, since the number of speculated tasks is few, and usually you need to fail a task a few times before the stage is cancelled, probably no-one noticed this in production. ## How was this patch tested? Unit test. The test fails before the change in Executor.scala cc @JoshRosen You can merge this pull request into a Git repository by running: $ git pull https://github.com/ericl/spark fix-task-interrupt Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17531.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17531 commit 8f6283a7c407d28c043523d91b8c3a24da0eff52 Author: Eric Liang <e...@databricks.com> Date: 2017-04-04T23:52:51Z Tue Apr 4 16:52:51 PDT 2017 commit 9d59960626178acb68918f1fce1a4f85b0da7493 Author: Eric Liang <e...@databricks.com> Date: 2017-04-05T00:04:06Z Tue Apr 4 17:04:06 PDT 2017 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17531: [SPARK-20217][core] Executor should not fail stag...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17531#discussion_r109998390 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,7 +432,7 @@ private[spark] class Executor( setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason))) -case _: InterruptedException if task.reasonIfKilled.isDefined => +case _: Throwable if task.reasonIfKilled.isDefined => --- End diff -- I think NonFatal is probably fine here -- an OOM could presumably cause these messages to be dropped and the task marked as failed anyways. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17659: [SPARK-20358] [core] Executors failing stage on interrup...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/17659 Ping. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17659: [SPARK-20358] [core] Executors failing stage on i...
GitHub user ericl opened a pull request: https://github.com/apache/spark/pull/17659 [SPARK-20358] [core] Executors failing stage on interrupted exception thrown by cancelled tasks ## What changes were proposed in this pull request? This was a regression introduced by my earlier PR here: https://github.com/apache/spark/pull/17531 It turns out NonFatal() does not in fact catch InterruptedException. ## How was this patch tested? Extended cancellation unit test coverage. The first test fails before this patch. cc @JoshRosen @mridulm You can merge this pull request into a Git repository by running: $ git pull https://github.com/ericl/spark spark-20358 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17659.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17659 commit 607e11aafe507b6f9355c94c9c9a27fd15014928 Author: Eric Liang <e...@databricks.com> Date: 2017-04-17T19:14:02Z Mon Apr 17 12:14:01 PDT 2017 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15398: [SPARK-17647][SQL] Fix backslash escaping in 'LIKE' patt...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/15398 This seems to have broken the build in branch-2.1, e.g. https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-branch-2.1-compile-maven-hadoop-2.6/591/consoleFull ``` [info] Compiling 196 Scala sources and 26 Java sources to /home/jenkins/workspace/spark-branch-2.1-compile-maven-hadoop-2.6/sql/catalyst/target/scala-2.11/classes... [error] /home/jenkins/workspace/spark-branch-2.1-compile-maven-hadoop-2.6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala:1107: not found: value parser [error] Try(parser.parseFunctionIdentifier(f)) match { [error] ^ [error] /home/jenkins/workspace/spark-branch-2.1-compile-maven-hadoop-2.6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala:1118: value funcName is not a member of Any [error] case f if FunctionRegistry.functionSet.contains(f.funcName) => (f, "SYSTEM") [error] ^ [error] /home/jenkins/workspace/spark-branch-2.1-compile-maven-hadoop-2.6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala:1120: type mismatch; [error] found : Seq[(Any, String)] [error] required: Seq[(org.apache.spark.sql.catalyst.FunctionIdentifier, String)] [error] }.distinct [error] ^ [error] /home/jenkins/workspace/spark-branch-2.1-compile-maven-hadoop-2.6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala:95: illegal inheritance; [error] self-type org.apache.spark.sql.catalyst.expressions.Like does not conform to org.apache.spark.sql.catalyst.expressions.StringRegexExpression's selftype org.apache.spark.sql.catalyst.expressions.StringRegexExpression with org.apache.spark.sql.catalyst.expressions.BinaryExpression [error] case class Like(left: Expression, right: Expression) extends StringRegexExpression { [error] ^ [error] /home/jenkins/workspace/spark-branch-2.1-compile-maven-hadoop-2.6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala:134: not found: value nullSafeCodeGen [error] nullSafeCodeGen(ctx, ev, (eval1, eval2) => { [error] ^ [error] 5 errors found [error] Compile failed at Apr 17, 2017 12:06:12 PM [13.382s] ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17623: [SPARK-20292][SQL] Clean up string representation of Tre...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/17623 Thanks for doing this, we recently hit an issue where O(n^2) sized expression tree-strings crashed the cluster and created many hundreds of gigabytes of log files. Could we also add a unit test that expression tree-strings don't exhibit this O(n^2) behavior? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17692: [SPARK-20398] [SQL] range() operator should inclu...
GitHub user ericl opened a pull request: https://github.com/apache/spark/pull/17692 [SPARK-20398] [SQL] range() operator should include cancellation reason when killed ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-19820 adds a reason field for why tasks were killed. However, for backwards compatibility it left the old TaskKilledException constructor which defaults to "unknown reason". The range() operator should use the constructor that fills in the reason rather than dropping it on task kill. ## How was this patch tested? I tested this manually. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ericl/spark fix-kill-reason-in-range Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17692.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17692 commit a690a631f6845767ec66f7a05908368e245baca3 Author: Eric Liang <e...@databricks.com> Date: 2017-04-19T23:03:23Z Wed Apr 19 16:03:23 PDT 2017 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17166: [SPARK-19820] [core] Allow reason to be specified for ta...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/17166 Added `killTask(id: TaskId, reason: String)` to SparkContext and a corresponding test. cc @joshrosen for the API changes. As discussed offline, it's very hard to preserve binary compatibility here since we have to move from a case object to a case class to add a reason. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r104566606 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala --- @@ -30,8 +30,20 @@ private[spark] trait SchedulerBackend { def reviveOffers(): Unit def defaultParallelism(): Int - def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = + /** + * Requests that an executor kills a running task. + * + * @param taskId Id of the task. + * @param executorId Id of the executor the task is running on. + * @param interruptThread Whether the executor should interrupt the task thread. + * @param reason The reason for the task kill. + * @param shouldRetry Whether the scheduler should retry the task. + */ + def killTask( + taskId: Long, executorId: String, interruptThread: Boolean, reason: String, --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r104572407 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -467,7 +474,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) -if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { +if (!taskSetManager.isZombie) { --- End diff -- cc @kayousterhout does removing this check seem safe to you? It looks like the only case `taskState != TaskState.KILLED` guards against here is cancelled speculative tasks. Since those are relatively rare, it seems ok to call revive offers in those cases unconditionally. Tasks from cancelled stages and jobs should still be handled by the remaining isZombie check. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r104595023 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -158,7 +158,8 @@ private[spark] class Executor( threadPool.execute(tr) } - def killTask(taskId: Long, interruptThread: Boolean): Unit = { + def killTask( --- End diff -- Fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r104594970 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -2250,6 +2250,25 @@ class SparkContext(config: SparkConf) extends Logging { } /** + * Kill a given task. It will be retried. + * + * @param taskId the task ID to kill + */ + def killTask(taskId: Long): Unit = { +killTask(taskId, "cancelled") + } + + /** + * Kill a given task. It will be retried. + * + * @param taskId the task ID to kill + * @param reason the reason for killing the task, which should be a short string + */ + def killTask(taskId: Long, reason: String): Unit = { --- End diff -- Well, it turns out there's not a good reason to not retry. The task will get retried anyways eventually unless the stage is cancelled. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r104595065 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala --- @@ -40,7 +40,8 @@ private[spark] object CoarseGrainedClusterMessages { // Driver to executors case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage - case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) + case class KillTask( --- End diff -- Fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
GitHub user ericl opened a pull request: https://github.com/apache/spark/pull/17166 [SPARK-19820] [core] Allow reason to be specified for task kill ## What changes were proposed in this pull request? This refactors the task kill path to allow specifying a reason for the task kill. The reason is propagated opaquely through events, and will show up in the UI automatically as `(N tasks killed: $reason)` and `TaskKilled: $reason`. Also, make the logic for whether a task failure should be retried explicit rather than special casing TaskKilled messages. cc @rxin ## How was this patch tested? Existing tests, tried killing some stages in the UI and verified the messages are as expected. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ericl/spark kill-reason Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17166.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17166 commit e9178b61f356ecf4469a58a05ee4183e7beb4bf9 Author: Eric Liang <e...@google.com> Date: 2017-03-04T23:47:36Z Allow reason to be specified for task kill --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17166: [SPARK-19820] [core] Allow reason to be specified for ta...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/17166 Yes -- this is useful if you want to implement extensions to Spark that can kill tasks for other reasons, e.g. if a debugger detects that a task has entered a bad state. Without this change, there is no way to provide the user feedback through the UI. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17166: [SPARK-19820] [core] Allow reason to be specified for ta...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/17166 That's right, its not here. This PR only adds the distinction between tasks killed due to stage cancellation and speculation attempts. On Sun, Mar 5, 2017, 3:04 AM Mridul Muralidharan <notificati...@github.com> wrote: > If I did not miss it, there is no way for user to provide this information > currently, right ? > Or is that coming in a subsequent PR ? > > â > You are receiving this because you authored the thread. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/17166#issuecomment-284220542>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAA6SmvlOdCiMSUezJt8WexHi5Xzor8Oks5ripakgaJpZM4MTQUz> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17749: [SPARK-20450] [SQL] Unexpected first-query schema...
Github user ericl closed the pull request at: https://github.com/apache/spark/pull/17749 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18714: [SPARK-20236][SQL] hive style partition overwrite
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/18714#discussion_r128909042 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -52,12 +55,22 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) */ @transient private var addedAbsPathFiles: mutable.Map[String, String] = null + @transient private var partitionPaths: mutable.Set[String] = null + + @transient private var stagingDir: Path = _ --- End diff -- Do you need to add these fields? It seems like they can be computed from `addedAbsPathFiles` and the constructor params respectively. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18714: [SPARK-20236][SQL] hive style partition overwrite
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/18714#discussion_r128913707 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -52,12 +55,22 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) */ @transient private var addedAbsPathFiles: mutable.Map[String, String] = null + @transient private var partitionPaths: mutable.Set[String] = null + + @transient private var stagingDir: Path = _ --- End diff -- I mean, we can turn stagingDir into `private def stagingDir` or a private variable in a function. Similarly, `partitionPaths` can be computed as `filesToMove.map(_.getPath).distinct` during the commit phase. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18714: [SPARK-20236][SQL] hive style partition overwrite
Github user ericl commented on the issue: https://github.com/apache/spark/pull/18714 Got it. On Sun, Jul 23, 2017, 10:40 PM Wenchen Fan <notificati...@github.com> wrote: > *@cloud-fan* commented on this pull request. > -- > > In > core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala > <https://github.com/apache/spark/pull/18714#discussion_r128919802>: > > > @@ -52,12 +55,22 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) > */ >@transient private var addedAbsPathFiles: mutable.Map[String, String] = null > > + @transient private var partitionPaths: mutable.Set[String] = null > + > + @transient private var stagingDir: Path = _ > > stagingDir may not needed, but we do need partitionPaths, which tracks > partitions with default path. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/18714#discussion_r128919802>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAA6SjVFAGGASJljw9mcxp92eUnErt5sks5sQ01OgaJpZM4OgOKK> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17633: [SPARK-20331][SQL] Enhanced Hive partition prunin...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/17633#discussion_r113566732 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -589,18 +590,34 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME)) .map(col => col.getName).toSet -filters.collect { - case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) => -s"${a.name} ${op.symbol} $v" - case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) => -s"$v ${op.symbol} ${a.name}" - case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType)) - if !varcharKeys.contains(a.name) => -s"""${a.name} ${op.symbol} ${quoteStringLiteral(v.toString)}""" - case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute) - if !varcharKeys.contains(a.name) => -s"""${quoteStringLiteral(v.toString)} ${op.symbol} ${a.name}""" -}.mkString(" and ") +def isFoldable(expr: Expression): Boolean = + (expr.dataType.isInstanceOf[IntegralType] || expr.dataType.isInstanceOf[StringType]) && + expr.foldable && + expr.deterministic + +def convertFoldable(expr: Expression): String = expr.dataType match { + case _: IntegralType => expr.eval(null).toString + case _: StringType => quoteStringLiteral(expr.eval(null).toString) +} + +def convert(filter: Expression): String = + filter match { +case In(a: Attribute, exprs) if exprs.forall(isFoldable) => + val or = exprs.map(expr => s"${a.name} = ${convertFoldable(expr)}").reduce(_ + " or " + _) + "(" + or + ")" +case op @ BinaryComparison(a: Attribute, expr2) +if !varcharKeys.contains(a.name) && isFoldable(expr2) => + s"(${a.name} ${op.symbol} ${convertFoldable(expr2)})" +case op @ BinaryComparison(expr1, a: Attribute) +if !varcharKeys.contains(a.name) && isFoldable(expr1) => + s"(${convertFoldable(expr1)} ${op.symbol} ${a.name})" +case op @ And(expr1, expr2) => + s"(${convert(expr1)} and ${convert(expr2)})" +case op @ Or(expr1, expr2) => + s"(${convert(expr1)} or ${convert(expr2)})" + } + +filters.flatMap(f => Try(convert(f)).toOption).mkString(" and ") --- End diff -- Why do we need a `Try` here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17749: [SPARK-20450] [SQL] Unexpected first-query schema...
GitHub user ericl opened a pull request: https://github.com/apache/spark/pull/17749 [SPARK-20450] [SQL] Unexpected first-query schema inference cost with 2.1.1 ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-19611 fixes a regression from 2.0 where Spark silently fails to read case-sensitive fields missing a case-sensitive schema in the table properties. The fix is to detect this situation, infer the schema, and write the case-sensitive schema into the metastore. However this can incur an unexpected performance hit the first time such a problematic table is queried (and there is a high false-positive rate here since most tables don't actually have case-sensitive fields). This PR changes the default to NEVER_INFER (same behavior as 2.1.0). In 2.2, we can consider leaving the default to INFER_AND_SAVE. ## How was this patch tested? Unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ericl/spark spark-20450 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17749.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17749 commit 4c0ff63044199f59ae25ce887485c16b4b83e663 Author: Eric Liang <e...@databricks.com> Date: 2017-04-24T19:28:11Z [SPARK-20450] [SQL] Unexpected first-query schema inference cost with 2.1.1 RC --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15306: [SPARK-17740] Spark tests should mock / interpose HDFS t...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/15306 Hm, we could it move the actual throw to the afterAll(), that would cause a suite abort instead but presumably leave the test errors intact. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18070: [SPARK-20713][Spark Core] Convert CommitDenied to...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/18070#discussion_r118628471 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -338,6 +340,9 @@ private[spark] class Executor( metricsSystem = env.metricsSystem) threwException = false res +} catch { + case _: CommitDeniedException => +throw new TaskKilledException("commit denied") --- End diff -- Doesn't a stage abort also cause tasks to show up as killed (due to "stage cancelled"?) https://github.com/apache/spark/blob/95aef660b73ec931e746d1ec8ae7848762ba0d7c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1531 It seems to me that CommitDenied always implies the task is killed, in which case it might be fine to convert all CommitDeniedExceptions into TaskKilled. Btw, there's a catch block below -- `case CausedBy(cDE: CommitDeniedException) =>` which seems like the right place to be doing this handling. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21185: [SPARK-23894][CORE][SQL] Defensively clear ActiveSession...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/21185 This makes sense to me. It would be slightly to clear it where where the session is getting leaked through threads, but if that's hard then this looks good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21934: [SPARK-24951][SQL] Table valued functions should throw A...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/21934 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20971: [SPARK-23809][SQL][backport] Active SparkSession should ...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/20971 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21058: [SPARK-23971] Should not leak Spark sessions acro...
GitHub user ericl opened a pull request: https://github.com/apache/spark/pull/21058 [SPARK-23971] Should not leak Spark sessions across test suites ## What changes were proposed in this pull request? Many suites currently leak Spark sessions (sometimes with stopped SparkContexts) via the thread-local active Spark session and default Spark session. We should attempt to clean these up and detect when this happens to improve the reproducibility of tests. ## How was this patch tested? Existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/ericl/spark clear-session Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21058.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21058 commit 92afcc2f7a5dfb2bc5aa94e009ef1787f42a83ab Author: Eric Liang <ekl@...> Date: 2018-04-12T19:43:48Z clear session --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21058: [SPARK-23971] Should not leak Spark sessions across test...
Github user ericl commented on the issue: https://github.com/apache/spark/pull/21058 This is a followup to https://github.com/apache/spark/pull/20971 @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20971: [SPARK-23809][SQL][backport] Active SparkSession ...
Github user ericl closed the pull request at: https://github.com/apache/spark/pull/20971 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org