[GitHub] [spark] 10110346 commented on a change in pull request #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
10110346 commented on a change in pull request #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec` URL: https://github.com/apache/spark/pull/24214#discussion_r269413253 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -889,6 +889,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val SUBQUERY_REUSE_ENABLED = buildConf("spark.sql.subquery.reuse") Review comment: I created a PR separately to fix this problem before, #23998 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] 10110346 commented on a change in pull request #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
10110346 commented on a change in pull request #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec` URL: https://github.com/apache/spark/pull/24214#discussion_r269413177 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ## @@ -674,6 +674,8 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def doCanonicalize(): SparkPlan = child.canonicalized Review comment: #18169 fixed this problem like this before, but I don't know why was it reverted. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #23747: [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending)
HeartSaVioR commented on a change in pull request #23747: [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending) URL: https://github.com/apache/spark/pull/23747#discussion_r269410616 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -135,23 +135,79 @@ private[kafka010] class KafkaOffsetReader( def fetchSpecificOffsets( partitionOffsets: Map[TopicPartition, Long], reportDataLoss: String => Unit): KafkaSourceOffset = { -val fetched = runUninterruptibly { - withRetriesWithoutInterrupt { -// Poll to get the latest assigned partitions -consumer.poll(0) -val partitions = consumer.assignment() +val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + assert(partitions.asScala == partitionOffsets.keySet, +"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + + "Use -1 for latest, -2 for earliest, if you don't care.\n" + + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") +} -// Call `position` to wait until the potential offset request triggered by `poll(0)` is -// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by -// `poll(0)` may reset offsets that should have been set by another request. -partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {}) +val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => + partitionOffsets +} + +val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { fetched => + partitionOffsets.foreach { +case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && + off != KafkaOffsetRangeLimit.EARLIEST => + if (fetched(tp) != off) { +reportDataLoss( + s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}") + } +case _ => +// no real way to check that beginning or end is reasonable + } +} + +fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets, + fnAssertFetchedOffsets) + } + + def fetchSpecificTimestampBasedOffsets(topicTimestamps: Map[String, Long]): KafkaSourceOffset = { +val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + val assignedTopics = partitions.asScala.map(_.topic()) + assert(assignedTopics == topicTimestamps.keySet, +"If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " + + s"topics. Specified: ${topicTimestamps.keySet} Assigned: $assignedTopics") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $topicTimestamps") +} + +val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { + partitions => { +val partitionTimestamps: ju.Map[TopicPartition, java.lang.Long] = + partitions.asScala.map { topicAndPartition => +topicAndPartition -> java.lang.Long.valueOf(topicTimestamps(topicAndPartition.topic())) + }.toMap.asJava + +val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] = + consumer.offsetsForTimes(partitionTimestamps) Review comment: Thought about this more: need to check whether it's possible to do, but the ideal behavior is not reading from such partitions, while reading from partitions which can provide matched offsets. We may have thought we should read from all partitions we've assigned, so have to choose between failing the query or taking the risk to read latest. Ideally we shouldn't do that. The problem is, for streaming query, the option is only used for once and ignored afterwards, which will break the intention what we are trying to do. For timestamp offset case, it should still try to pull the offsets based on timestamp for every batch, and compare it with offsets in checkpoint if exists to see whether we can read from such partition or not. This sounds me as a non-trivial effort, so maybe we need to start with just failing the query for taking safest approach, and address above in follow-up issue. (It would work badly for batch query, unfortunately.) @jose-torres What do you think? cc. @koeninger @gaborgsomogyi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [spark] HeartSaVioR commented on a change in pull request #23747: [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending)
HeartSaVioR commented on a change in pull request #23747: [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending) URL: https://github.com/apache/spark/pull/23747#discussion_r269410616 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -135,23 +135,79 @@ private[kafka010] class KafkaOffsetReader( def fetchSpecificOffsets( partitionOffsets: Map[TopicPartition, Long], reportDataLoss: String => Unit): KafkaSourceOffset = { -val fetched = runUninterruptibly { - withRetriesWithoutInterrupt { -// Poll to get the latest assigned partitions -consumer.poll(0) -val partitions = consumer.assignment() +val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + assert(partitions.asScala == partitionOffsets.keySet, +"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + + "Use -1 for latest, -2 for earliest, if you don't care.\n" + + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") +} -// Call `position` to wait until the potential offset request triggered by `poll(0)` is -// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by -// `poll(0)` may reset offsets that should have been set by another request. -partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {}) +val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => + partitionOffsets +} + +val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { fetched => + partitionOffsets.foreach { +case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && + off != KafkaOffsetRangeLimit.EARLIEST => + if (fetched(tp) != off) { +reportDataLoss( + s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}") + } +case _ => +// no real way to check that beginning or end is reasonable + } +} + +fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets, + fnAssertFetchedOffsets) + } + + def fetchSpecificTimestampBasedOffsets(topicTimestamps: Map[String, Long]): KafkaSourceOffset = { +val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + val assignedTopics = partitions.asScala.map(_.topic()) + assert(assignedTopics == topicTimestamps.keySet, +"If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " + + s"topics. Specified: ${topicTimestamps.keySet} Assigned: $assignedTopics") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $topicTimestamps") +} + +val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { + partitions => { +val partitionTimestamps: ju.Map[TopicPartition, java.lang.Long] = + partitions.asScala.map { topicAndPartition => +topicAndPartition -> java.lang.Long.valueOf(topicTimestamps(topicAndPartition.topic())) + }.toMap.asJava + +val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] = + consumer.offsetsForTimes(partitionTimestamps) Review comment: Thought about this more: need to check whether it's possible to do, but the ideal behavior is not reading from such partitions, while reading from partitions which can provide matched offsets. We may have thought we should read from all partitions we've assigned, so have to choose between failing the query or taking the risk to read latest. Ideally we shouldn't do that. The problem is, for streaming query, the option is only used for once and ignored afterwards, which will break the intention what we are trying to do. For timestamp offset case, it should still try to pull the offsets based on timestamp for every batch, and compare it with offsets in checkpoint if exists to see whether we can read from such partition or not. This sounds me as a non-trivial effort, so maybe we need to start with just failing the query for taking safest approach, and address above in follow-up issue. @jose-torres What do you think? cc. @koeninger @gaborgsomogyi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe,
[GitHub] [spark] chakravarthiT edited a comment on issue #24136: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor
chakravarthiT edited a comment on issue #24136: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor URL: https://github.com/apache/spark/pull/24136#issuecomment-476948267 @maropu @HyukjinKwon handled review comments. and now that UT is passed,as message will be evaluated only when specific log level is set. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #23747: [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending)
HeartSaVioR commented on a change in pull request #23747: [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending) URL: https://github.com/apache/spark/pull/23747#discussion_r269410616 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -135,23 +135,79 @@ private[kafka010] class KafkaOffsetReader( def fetchSpecificOffsets( partitionOffsets: Map[TopicPartition, Long], reportDataLoss: String => Unit): KafkaSourceOffset = { -val fetched = runUninterruptibly { - withRetriesWithoutInterrupt { -// Poll to get the latest assigned partitions -consumer.poll(0) -val partitions = consumer.assignment() +val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + assert(partitions.asScala == partitionOffsets.keySet, +"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + + "Use -1 for latest, -2 for earliest, if you don't care.\n" + + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") +} -// Call `position` to wait until the potential offset request triggered by `poll(0)` is -// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by -// `poll(0)` may reset offsets that should have been set by another request. -partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {}) +val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => + partitionOffsets +} + +val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { fetched => + partitionOffsets.foreach { +case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && + off != KafkaOffsetRangeLimit.EARLIEST => + if (fetched(tp) != off) { +reportDataLoss( + s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}") + } +case _ => +// no real way to check that beginning or end is reasonable + } +} + +fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets, + fnAssertFetchedOffsets) + } + + def fetchSpecificTimestampBasedOffsets(topicTimestamps: Map[String, Long]): KafkaSourceOffset = { +val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + val assignedTopics = partitions.asScala.map(_.topic()) + assert(assignedTopics == topicTimestamps.keySet, +"If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " + + s"topics. Specified: ${topicTimestamps.keySet} Assigned: $assignedTopics") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $topicTimestamps") +} + +val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { + partitions => { +val partitionTimestamps: ju.Map[TopicPartition, java.lang.Long] = + partitions.asScala.map { topicAndPartition => +topicAndPartition -> java.lang.Long.valueOf(topicTimestamps(topicAndPartition.topic())) + }.toMap.asJava + +val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] = + consumer.offsetsForTimes(partitionTimestamps) Review comment: Thought about this more: need to check whether it's possible to do, but the ideal behavior is not reading from such partitions. We may have thought we should read from all partitions we've assigned, so have to choose between failing the query or taking the risk to read latest. Ideally we shouldn't do that. The problem is, for streaming query, the option is only used for once and ignored afterwards, which will break the intention what we are trying to do. For timestamp offset case, it should still try to pull the offsets based on timestamp for every batch, and compare it with offsets in checkpoint if exists to see whether we can read from such partition or not. This sounds me as a non-trivial effort, so maybe we need to start with just failing the query for taking safest approach, and address above in follow-up issue. @jose-torres What do you think? cc. @koeninger @gaborgsomogyi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands,
[GitHub] [spark] felixcheung commented on issue #23721: [SPARK-26797][SQL][WIP][test-maven] Start using the new logical types API of Parquet 1.11.0 instead of the deprecated one
felixcheung commented on issue #23721: [SPARK-26797][SQL][WIP][test-maven] Start using the new logical types API of Parquet 1.11.0 instead of the deprecated one URL: https://github.com/apache/spark/pull/23721#issuecomment-476985992 that's great, honestly we can't merge this until parquet 1.11 is released officially. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #23747: [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending)
HeartSaVioR commented on a change in pull request #23747: [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending) URL: https://github.com/apache/spark/pull/23747#discussion_r269410616 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -135,23 +135,79 @@ private[kafka010] class KafkaOffsetReader( def fetchSpecificOffsets( partitionOffsets: Map[TopicPartition, Long], reportDataLoss: String => Unit): KafkaSourceOffset = { -val fetched = runUninterruptibly { - withRetriesWithoutInterrupt { -// Poll to get the latest assigned partitions -consumer.poll(0) -val partitions = consumer.assignment() +val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + assert(partitions.asScala == partitionOffsets.keySet, +"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + + "Use -1 for latest, -2 for earliest, if you don't care.\n" + + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") +} -// Call `position` to wait until the potential offset request triggered by `poll(0)` is -// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by -// `poll(0)` may reset offsets that should have been set by another request. -partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {}) +val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => + partitionOffsets +} + +val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { fetched => + partitionOffsets.foreach { +case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && + off != KafkaOffsetRangeLimit.EARLIEST => + if (fetched(tp) != off) { +reportDataLoss( + s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}") + } +case _ => +// no real way to check that beginning or end is reasonable + } +} + +fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets, + fnAssertFetchedOffsets) + } + + def fetchSpecificTimestampBasedOffsets(topicTimestamps: Map[String, Long]): KafkaSourceOffset = { +val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + val assignedTopics = partitions.asScala.map(_.topic()) + assert(assignedTopics == topicTimestamps.keySet, +"If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " + + s"topics. Specified: ${topicTimestamps.keySet} Assigned: $assignedTopics") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $topicTimestamps") +} + +val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { + partitions => { +val partitionTimestamps: ju.Map[TopicPartition, java.lang.Long] = + partitions.asScala.map { topicAndPartition => +topicAndPartition -> java.lang.Long.valueOf(topicTimestamps(topicAndPartition.topic())) + }.toMap.asJava + +val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] = + consumer.offsetsForTimes(partitionTimestamps) Review comment: Think about this more: need to check whether it's possible to do, but the ideal option is not reading from such partitions. We may have thought we should read from all partitions we've assigned, so have to choose between failing the query or taking the risk to read latest. Ideally we shouldn't do that. The problem is, for streaming query, the option is only used for once and ignored afterwards, which will break the intention what we are trying to do. For timestamp offset case, it should still try to pull the offsets based on timestamp for every batch, and compare it with offsets in checkpoint if exists to see whether we can read from such partition or not. This sounds me as a non-trivial effort, so maybe we need to start with just failing the query for taking safest approach, and address above in follow-up issue. @jose-torres What do you think? cc. @koeninger @gaborgsomogyi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands,
[GitHub] [spark] HeartSaVioR commented on a change in pull request #23747: [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending)
HeartSaVioR commented on a change in pull request #23747: [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending) URL: https://github.com/apache/spark/pull/23747#discussion_r269410616 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -135,23 +135,79 @@ private[kafka010] class KafkaOffsetReader( def fetchSpecificOffsets( partitionOffsets: Map[TopicPartition, Long], reportDataLoss: String => Unit): KafkaSourceOffset = { -val fetched = runUninterruptibly { - withRetriesWithoutInterrupt { -// Poll to get the latest assigned partitions -consumer.poll(0) -val partitions = consumer.assignment() +val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + assert(partitions.asScala == partitionOffsets.keySet, +"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + + "Use -1 for latest, -2 for earliest, if you don't care.\n" + + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") +} -// Call `position` to wait until the potential offset request triggered by `poll(0)` is -// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by -// `poll(0)` may reset offsets that should have been set by another request. -partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {}) +val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => + partitionOffsets +} + +val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { fetched => + partitionOffsets.foreach { +case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && + off != KafkaOffsetRangeLimit.EARLIEST => + if (fetched(tp) != off) { +reportDataLoss( + s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}") + } +case _ => +// no real way to check that beginning or end is reasonable + } +} + +fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets, + fnAssertFetchedOffsets) + } + + def fetchSpecificTimestampBasedOffsets(topicTimestamps: Map[String, Long]): KafkaSourceOffset = { +val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + val assignedTopics = partitions.asScala.map(_.topic()) + assert(assignedTopics == topicTimestamps.keySet, +"If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " + + s"topics. Specified: ${topicTimestamps.keySet} Assigned: $assignedTopics") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $topicTimestamps") +} + +val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { + partitions => { +val partitionTimestamps: ju.Map[TopicPartition, java.lang.Long] = + partitions.asScala.map { topicAndPartition => +topicAndPartition -> java.lang.Long.valueOf(topicTimestamps(topicAndPartition.topic())) + }.toMap.asJava + +val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] = + consumer.offsetsForTimes(partitionTimestamps) Review comment: Think about this more: need to check whether it's possible to do, but the ideal behavior is not reading from such partitions. We may have thought we should read from all partitions we've assigned, so have to choose between failing the query or taking the risk to read latest. Ideally we shouldn't do that. The problem is, for streaming query, the option is only used for once and ignored afterwards, which will break the intention what we are trying to do. For timestamp offset case, it should still try to pull the offsets based on timestamp for every batch, and compare it with offsets in checkpoint if exists to see whether we can read from such partition or not. This sounds me as a non-trivial effort, so maybe we need to start with just failing the query for taking safest approach, and address above in follow-up issue. @jose-torres What do you think? cc. @koeninger @gaborgsomogyi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands,
[GitHub] [spark] cloud-fan closed pull request #24225: [SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader
cloud-fan closed pull request #24225: [SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader URL: https://github.com/apache/spark/pull/24225 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gatorsmile closed pull request #24119: [SPARK-27182][SQL] Move the conflict source code of the sql/core module to sql/core/v1.2.1
gatorsmile closed pull request #24119: [SPARK-27182][SQL] Move the conflict source code of the sql/core module to sql/core/v1.2.1 URL: https://github.com/apache/spark/pull/24119 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on issue #24225: [SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader
cloud-fan commented on issue #24225: [SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader URL: https://github.com/apache/spark/pull/24225#issuecomment-476983293 thanks, merging to master! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gatorsmile commented on issue #24119: [SPARK-27182][SQL] Move the conflict source code of the sql/core module to sql/core/v1.2.1
gatorsmile commented on issue #24119: [SPARK-27182][SQL] Move the conflict source code of the sql/core module to sql/core/v1.2.1 URL: https://github.com/apache/spark/pull/24119#issuecomment-476982813 I am merging this to master. @wangyum Please submit your follow-up PR for making the actual changes. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gatorsmile commented on a change in pull request #24119: [SPARK-27182][SQL] Move the conflict source code of the sql/core module to sql/core/v1.2.1
gatorsmile commented on a change in pull request #24119: [SPARK-27182][SQL] Move the conflict source code of the sql/core module to sql/core/v1.2.1 URL: https://github.com/apache/spark/pull/24119#discussion_r269410080 ## File path: sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.orc.storage.common.`type`.HiveDecimal +import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} +import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder +import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types._ + +/** + * Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down. + * + * Due to limitation of ORC `SearchArgument` builder, we had to end up with a pretty weird double- + * checking pattern when converting `And`/`Or`/`Not` filters. + * + * An ORC `SearchArgument` must be built in one pass using a single builder. For example, you can't + * build `a = 1` and `b = 2` first, and then combine them into `a = 1 AND b = 2`. This is quite + * different from the cases in Spark SQL or Parquet, where complex filters can be easily built using + * existing simpler ones. + * + * The annoying part is that, `SearchArgument` builder methods like `startAnd()`, `startOr()`, and + * `startNot()` mutate internal state of the builder instance. This forces us to translate all + * convertible filters with a single builder instance. However, before actually converting a filter, + * we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible filter is + * found, we may already end up with a builder whose internal state is inconsistent. + * + * For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and then + * try to convert its children. Say we convert `left` child successfully, but find that `right` + * child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is inconsistent + * now. + * + * The workaround employed here is that, for `And`/`Or`/`Not`, we first try to convert their + * children with brand new builders, and only do the actual conversion with the right builder + * instance when the children are proven to be convertible. + * + * P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only. Usage of + * builder methods mentioned above can only be found in test code, where all tested filters are + * known to be convertible. + */ +private[sql] object OrcFilters extends OrcFiltersBase { Review comment: Talk with @wangyum offline. All these functions need a change in v2.3.4. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
SparkQA commented on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec` URL: https://github.com/apache/spark/pull/24214#issuecomment-476981979 **[Test build #104002 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/104002/testReport)** for PR 24214 at commit [`40dc3f9`](https://github.com/apache/spark/commit/40dc3f91a10981ba4aa31c03a61f7c79704143ca). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
AmplabJenkins removed a comment on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec` URL: https://github.com/apache/spark/pull/24214#issuecomment-476981440 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
AmplabJenkins removed a comment on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec` URL: https://github.com/apache/spark/pull/24214#issuecomment-476981443 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/9352/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
AmplabJenkins commented on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec` URL: https://github.com/apache/spark/pull/24214#issuecomment-476981443 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/9352/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
AmplabJenkins commented on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec` URL: https://github.com/apache/spark/pull/24214#issuecomment-476981440 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gatorsmile commented on a change in pull request #24119: [SPARK-27182][SQL] Move the conflict source code of the sql/core module to sql/core/v1.2.1
gatorsmile commented on a change in pull request #24119: [SPARK-27182][SQL] Move the conflict source code of the sql/core module to sql/core/v1.2.1 URL: https://github.com/apache/spark/pull/24119#discussion_r269408856 ## File path: sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.orc.storage.common.`type`.HiveDecimal +import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} +import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder +import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types._ + +/** + * Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down. + * + * Due to limitation of ORC `SearchArgument` builder, we had to end up with a pretty weird double- + * checking pattern when converting `And`/`Or`/`Not` filters. + * + * An ORC `SearchArgument` must be built in one pass using a single builder. For example, you can't + * build `a = 1` and `b = 2` first, and then combine them into `a = 1 AND b = 2`. This is quite + * different from the cases in Spark SQL or Parquet, where complex filters can be easily built using + * existing simpler ones. + * + * The annoying part is that, `SearchArgument` builder methods like `startAnd()`, `startOr()`, and + * `startNot()` mutate internal state of the builder instance. This forces us to translate all + * convertible filters with a single builder instance. However, before actually converting a filter, + * we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible filter is + * found, we may already end up with a builder whose internal state is inconsistent. + * + * For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and then + * try to convert its children. Say we convert `left` child successfully, but find that `right` + * child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is inconsistent + * now. + * + * The workaround employed here is that, for `And`/`Or`/`Not`, we first try to convert their + * children with brand new builders, and only do the actual conversion with the right builder + * instance when the children are proven to be convertible. + * + * P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only. Usage of + * builder methods mentioned above can only be found in test code, where all tested filters are + * known to be convertible. + */ +private[sql] object OrcFilters extends OrcFiltersBase { Review comment: All the functions in this object are different between v2.3.4 and v1.2? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
SparkQA commented on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec` URL: https://github.com/apache/spark/pull/24214#issuecomment-476979983 **[Test build #104001 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/104001/testReport)** for PR 24214 at commit [`7177ed7`](https://github.com/apache/spark/commit/7177ed7b261c89bc2dd77bb7c554168c07743316). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
AmplabJenkins removed a comment on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec` URL: https://github.com/apache/spark/pull/24214#issuecomment-476979501 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/9351/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
AmplabJenkins removed a comment on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec` URL: https://github.com/apache/spark/pull/24214#issuecomment-476979495 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
AmplabJenkins commented on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec` URL: https://github.com/apache/spark/pull/24214#issuecomment-476979495 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
AmplabJenkins commented on issue #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec` URL: https://github.com/apache/spark/pull/24214#issuecomment-476979501 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/9351/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] adrian-wang commented on a change in pull request #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
adrian-wang commented on a change in pull request #24214: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec` URL: https://github.com/apache/spark/pull/24214#discussion_r269407893 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ## @@ -125,15 +125,15 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] { case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = { -if (!conf.exchangeReuseEnabled) { +if (!conf.subqueryReuseEnabled) { return plan } // Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls. val subqueries = mutable.HashMap[StructType, ArrayBuffer[SubqueryExec]]() plan transformAllExpressions { case sub: ExecSubqueryExpression => val sameSchema = subqueries.getOrElseUpdate(sub.plan.schema, ArrayBuffer[SubqueryExec]()) -val sameResult = sameSchema.find(_.sameResult(sub.plan)) +val sameResult = sameSchema.find(_.child.sameResult(sub.plan.child)) Review comment: I have updated code as @cloud-fan suggests, thanks for all your reviews! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #24218: [SPARK-27281][DStreams] Change the way latest kafka offsets are retrieved to consumer#endOffsets
beliefer commented on a change in pull request #24218: [SPARK-27281][DStreams] Change the way latest kafka offsets are retrieved to consumer#endOffsets URL: https://github.com/apache/spark/pull/24218#discussion_r269399030 ## File path: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala ## @@ -207,8 +207,12 @@ private[spark] class DirectKafkaInputDStream[K, V]( currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap // find latest available offsets -c.seekToEnd(currentOffsets.keySet.asJava) -parts.map(tp => tp -> c.position(tp)).toMap +val end = c.endOffsets(currentOffsets.keySet.asJava) Review comment: This change maybe resolve your issue, but a solution for common. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #23747: [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending)
HeartSaVioR commented on a change in pull request #23747: [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending) URL: https://github.com/apache/spark/pull/23747#discussion_r269397115 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -135,23 +135,79 @@ private[kafka010] class KafkaOffsetReader( def fetchSpecificOffsets( partitionOffsets: Map[TopicPartition, Long], reportDataLoss: String => Unit): KafkaSourceOffset = { -val fetched = runUninterruptibly { - withRetriesWithoutInterrupt { -// Poll to get the latest assigned partitions -consumer.poll(0) -val partitions = consumer.assignment() +val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + assert(partitions.asScala == partitionOffsets.keySet, +"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + + "Use -1 for latest, -2 for earliest, if you don't care.\n" + + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") +} -// Call `position` to wait until the potential offset request triggered by `poll(0)` is -// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by -// `poll(0)` may reset offsets that should have been set by another request. -partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {}) +val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => + partitionOffsets +} + +val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { fetched => + partitionOffsets.foreach { +case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && + off != KafkaOffsetRangeLimit.EARLIEST => + if (fetched(tp) != off) { +reportDataLoss( + s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}") + } +case _ => +// no real way to check that beginning or end is reasonable + } +} + +fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets, + fnAssertFetchedOffsets) + } + + def fetchSpecificTimestampBasedOffsets(topicTimestamps: Map[String, Long]): KafkaSourceOffset = { +val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + val assignedTopics = partitions.asScala.map(_.topic()) + assert(assignedTopics == topicTimestamps.keySet, +"If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " + + s"topics. Specified: ${topicTimestamps.keySet} Assigned: $assignedTopics") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $topicTimestamps") +} + +val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { + partitions => { +val partitionTimestamps: ju.Map[TopicPartition, java.lang.Long] = + partitions.asScala.map { topicAndPartition => +topicAndPartition -> java.lang.Long.valueOf(topicTimestamps(topicAndPartition.topic())) + }.toMap.asJava + +val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] = + consumer.offsetsForTimes(partitionTimestamps) Review comment: As we see both possibilities: maybe we could have an option which makes this case failing the query by default with proposing log message to toggle if they really want to just start reading. I'll address it soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan edited a comment on issue #24215: [SPARK-27229][SQL] GroupBy Placement in Intersect Distinct
cloud-fan edited a comment on issue #24215: [SPARK-27229][SQL] GroupBy Placement in Intersect Distinct URL: https://github.com/apache/spark/pull/24215#issuecomment-476959548 I'm afraid this may cause big regression, as aggregate is expensive and here we do it twice. Aggregate needs to build a hash map, which may spill to disk if data is large. If the aggregate doesn't help with data reduction, we hit regression. Unless there is a proposal to solve the regression, I think we shouldn't merge it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] shivusondur commented on issue #23926: [SPARK-26872][STREAMING] Use a configurable value for final termination in the JobScheduler.stop() method
shivusondur commented on issue #23926: [SPARK-26872][STREAMING] Use a configurable value for final termination in the JobScheduler.stop() method URL: https://github.com/apache/spark/pull/23926#issuecomment-476959691 @smrosenberry "only developer can configure" means it is still available for configuration for all. But corresponding details are not exposed in the user through documentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on issue #24215: [SPARK-27229][SQL] GroupBy Placement in Intersect Distinct
cloud-fan commented on issue #24215: [SPARK-27229][SQL] GroupBy Placement in Intersect Distinct URL: https://github.com/apache/spark/pull/24215#issuecomment-476959548 I'm afraid this may cause big regression, as aggregate is expensive and here we do it twice. Aggregate needs to build a hash map, which may spill to disk if data is large. If the aggregate doesn't help with data reduction, we hit regression. Unless there is a proposal to resolve the regression, I think we shouldn't merge it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #24200: [SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views
AmplabJenkins commented on issue #24200: [SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views URL: https://github.com/apache/spark/pull/24200#issuecomment-476958489 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on issue #24200: [SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views
AmplabJenkins removed a comment on issue #24200: [SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views URL: https://github.com/apache/spark/pull/24200#issuecomment-476958489 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #24200: [SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views
AmplabJenkins commented on issue #24200: [SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views URL: https://github.com/apache/spark/pull/24200#issuecomment-476958495 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/103999/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on issue #24200: [SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views
AmplabJenkins removed a comment on issue #24200: [SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views URL: https://github.com/apache/spark/pull/24200#issuecomment-476958495 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/103999/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24195: [SPARK-25496][SQL] Deprecate from_utc_timestamp and to_utc_timestamp
cloud-fan commented on a change in pull request #24195: [SPARK-25496][SQL] Deprecate from_utc_timestamp and to_utc_timestamp URL: https://github.com/apache/spark/pull/24195#discussion_r269395838 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala ## @@ -1231,6 +1238,11 @@ case class MonthsBetween( case class ToUTCTimestamp(left: Expression, right: Expression) extends BinaryExpression with ImplicitCastInputTypes { + if (!SQLConf.get.utcTimestampFuncEnabled) { +throw new AnalysisException(s"The $prettyName function has been disabled since Spark 3.0. " + + s"Set ${SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key} to true to enable this function.") + } Review comment: As I said, I don't think warning log is the right way to deprecate a SQL function, as users won't see it. This is not good either but this is the best I can think of. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on issue #24200: [SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views
SparkQA removed a comment on issue #24200: [SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views URL: https://github.com/apache/spark/pull/24200#issuecomment-476882510 **[Test build #103999 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/103999/testReport)** for PR 24200 at commit [`7b66f0d`](https://github.com/apache/spark/commit/7b66f0d9d08ef7a35b467aa6b57568b409110ad0). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on issue #24200: [SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views
SparkQA commented on issue #24200: [SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views URL: https://github.com/apache/spark/pull/24200#issuecomment-476958017 **[Test build #103999 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/103999/testReport)** for PR 24200 at commit [`7b66f0d`](https://github.com/apache/spark/commit/7b66f0d9d08ef7a35b467aa6b57568b409110ad0). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB
HyukjinKwon commented on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB URL: https://github.com/apache/spark/pull/24226#issuecomment-476949248 Oops, I thought merged it to master. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] chakravarthiT commented on issue #24136: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor
chakravarthiT commented on issue #24136: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor URL: https://github.com/apache/spark/pull/24136#issuecomment-476948267 @maropu @HyukjinKwon handled review comments. and now that UT wont fail ,as message will be evaluated only when specific log level is set. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dilipbiswal commented on a change in pull request #24209: [SPARK-27255][SQL] Aggregate functions should not be allowed in WHERE
dilipbiswal commented on a change in pull request #24209: [SPARK-27255][SQL] Aggregate functions should not be allowed in WHERE URL: https://github.com/apache/spark/pull/24209#discussion_r269388977 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala ## @@ -172,16 +176,15 @@ trait CheckAnalysis extends PredicateHelper { failAnalysis("Null-aware predicate sub-queries cannot be used in nested " + s"conditions: $condition") + case Filter(condition, _) if condition.find(isAggregateExpression(_)).isDefined => Review comment: @gatorsmile Thanks for reviewing. I will try it and get back. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer edited a comment on issue #23841: [SPARK-26936][SQL] Fix bug of insert overwrite local dir can not create temporary path in local staging directory
beliefer edited a comment on issue #23841: [SPARK-26936][SQL] Fix bug of insert overwrite local dir can not create temporary path in local staging directory URL: https://github.com/apache/spark/pull/23841#issuecomment-476472507 > Will we hit this bug when we deploy spark in cluster? Seems to me it's not specific to yarn. Yes, If spark runs in `yarn-client` deploy mode, this bug will occurs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] shaneknapp commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests
shaneknapp commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests URL: https://github.com/apache/spark/pull/23514#issuecomment-476944643 > :D Great! Please merge this if this is finally done, @shaneknapp ! i will do so tomorrow, after i poke around and test a bit more w/the env vars that @skonto mentioned earlier. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jose-torres commented on a change in pull request #23747: [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending)
jose-torres commented on a change in pull request #23747: [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending) URL: https://github.com/apache/spark/pull/23747#discussion_r269384961 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -135,23 +135,79 @@ private[kafka010] class KafkaOffsetReader( def fetchSpecificOffsets( partitionOffsets: Map[TopicPartition, Long], reportDataLoss: String => Unit): KafkaSourceOffset = { -val fetched = runUninterruptibly { - withRetriesWithoutInterrupt { -// Poll to get the latest assigned partitions -consumer.poll(0) -val partitions = consumer.assignment() +val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + assert(partitions.asScala == partitionOffsets.keySet, +"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + + "Use -1 for latest, -2 for earliest, if you don't care.\n" + + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") +} -// Call `position` to wait until the potential offset request triggered by `poll(0)` is -// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by -// `poll(0)` may reset offsets that should have been set by another request. -partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {}) +val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => + partitionOffsets +} + +val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { fetched => + partitionOffsets.foreach { +case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && + off != KafkaOffsetRangeLimit.EARLIEST => + if (fetched(tp) != off) { +reportDataLoss( + s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}") + } +case _ => +// no real way to check that beginning or end is reasonable + } +} + +fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets, + fnAssertFetchedOffsets) + } + + def fetchSpecificTimestampBasedOffsets(topicTimestamps: Map[String, Long]): KafkaSourceOffset = { +val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + val assignedTopics = partitions.asScala.map(_.topic()) + assert(assignedTopics == topicTimestamps.keySet, +"If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " + + s"topics. Specified: ${topicTimestamps.keySet} Assigned: $assignedTopics") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $topicTimestamps") +} + +val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { + partitions => { +val partitionTimestamps: ju.Map[TopicPartition, java.lang.Long] = + partitions.asScala.map { topicAndPartition => +topicAndPartition -> java.lang.Long.valueOf(topicTimestamps(topicAndPartition.topic())) + }.toMap.asJava + +val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] = + consumer.offsetsForTimes(partitionTimestamps) Review comment: I can definitely see why the default-to-latest behavior could be helpful for things like data skew. But when push comes to shove, if I specify startingTimestamp = 200 and the output contains timestamp 195, I think users are going to consider that a pretty straightforward bug. The underlying Kafka consumer library doesn't have this behavior; it will keep returning null forever until an offset after the starting timestamp becomes available. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] windpiger commented on issue #24215: [SPARK-27229][SQL] GroupBy Placement in Intersect Distinct
windpiger commented on issue #24215: [SPARK-27229][SQL] GroupBy Placement in Intersect Distinct URL: https://github.com/apache/spark/pull/24215#issuecomment-476931507 > Yea, I think so. How about checking `#distinctRowCount / rowCount` before optimization? right, cost driven will be better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] windpiger commented on issue #24215: [SPARK-27229][SQL] GroupBy Placement in Intersect Distinct
windpiger commented on issue #24215: [SPARK-27229][SQL] GroupBy Placement in Intersect Distinct URL: https://github.com/apache/spark/pull/24215#issuecomment-476931064 > Could this lead to performance regressions if the group by doesn't have reduction? Here provide a switch to turn it of/on ( default is false). It is better to do this by cost driven. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu closed pull request #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB
maropu closed pull request #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB URL: https://github.com/apache/spark/pull/24226 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB
maropu commented on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB URL: https://github.com/apache/spark/pull/24226#issuecomment-476928566 I checked most noisy warning logs gone. Thanks! Merged to master. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB
HyukjinKwon commented on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB URL: https://github.com/apache/spark/pull/24226#issuecomment-476928355 Merged to master. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB
AmplabJenkins commented on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB URL: https://github.com/apache/spark/pull/24226#issuecomment-476927774 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/103995/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB
AmplabJenkins removed a comment on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB URL: https://github.com/apache/spark/pull/24226#issuecomment-476927774 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/103995/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB
AmplabJenkins removed a comment on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB URL: https://github.com/apache/spark/pull/24226#issuecomment-476927765 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB
AmplabJenkins commented on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB URL: https://github.com/apache/spark/pull/24226#issuecomment-476927765 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB
SparkQA removed a comment on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB URL: https://github.com/apache/spark/pull/24226#issuecomment-476852479 **[Test build #103995 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/103995/testReport)** for PR 24226 at commit [`31e6945`](https://github.com/apache/spark/commit/31e6945b1e48729849a804d4426822eb565e8c94). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] caneGuy commented on issue #23670: [SPARK-26601][SQL] Make broadcast-exchange thread pool configurable
caneGuy commented on issue #23670: [SPARK-26601][SQL] Make broadcast-exchange thread pool configurable URL: https://github.com/apache/spark/pull/23670#issuecomment-476927512 @dongjoon-hyun could you help check this pr?thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wypoon commented on a change in pull request #23767: [SPARK-26329][CORE] Faster polling of executor memory metrics.
wypoon commented on a change in pull request #23767: [SPARK-26329][CORE] Faster polling of executor memory metrics. URL: https://github.com/apache/spark/pull/23767#discussion_r269378096 ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -143,7 +143,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi execId: String, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId, -executorUpdates: ExecutorMetrics): Boolean = true +executorUpdates: scala.collection.Map[(Int, Int), ExecutorMetrics]): Boolean = true Review comment: Your question goes to the signature in TaskScheduler#executorHeartbeatReceived then. I guess I was used to using java.util.Map and java.util.List in java-land, so I thought the more general type is preferred in the signature. I used a scala.collection.Map for executorUpdates in the case class Heartbeat, and so that percolated through. Does it make a difference? It is true that I actually pass a mutable.Map in the Heartbeat. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB
SparkQA commented on issue #24226: [SPARK-26660][FOLLOWUP] Raise task serialized size warning threshold to 1000 KiB URL: https://github.com/apache/spark/pull/24226#issuecomment-476927260 **[Test build #103995 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/103995/testReport)** for PR 24226 at commit [`31e6945`](https://github.com/apache/spark/commit/31e6945b1e48729849a804d4426822eb565e8c94). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on issue #23841: [SPARK-26936][SQL] Fix bug of insert overwrite local dir can not create temporary path in local staging directory
beliefer commented on issue #23841: [SPARK-26936][SQL] Fix bug of insert overwrite local dir can not create temporary path in local staging directory URL: https://github.com/apache/spark/pull/23841#issuecomment-476924777 > That makes more sense if this isn't YARN-specific, but isn't this still using a local path as if it's remote, or am I misreading? The SQL start with `insert overwrite local directory` decides the target path is local. There exists an intermediate process write result data to a temp path first. After the intermediate process, move result data from temp path to local target path. The original code uses a local path as temp path. This PR uses a distributed path as temp path. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gatorsmile commented on a change in pull request #24209: [SPARK-27255][SQL] Aggregate functions should not be allowed in WHERE
gatorsmile commented on a change in pull request #24209: [SPARK-27255][SQL] Aggregate functions should not be allowed in WHERE URL: https://github.com/apache/spark/pull/24209#discussion_r269376472 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala ## @@ -172,16 +176,15 @@ trait CheckAnalysis extends PredicateHelper { failAnalysis("Null-aware predicate sub-queries cannot be used in nested " + s"conditions: $condition") + case Filter(condition, _) if condition.find(isAggregateExpression(_)).isDefined => Review comment: call plan integrity checking here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gatorsmile commented on issue #24209: [SPARK-27255][SQL] Aggregate functions should not be allowed in WHERE
gatorsmile commented on issue #24209: [SPARK-27255][SQL] Aggregate functions should not be allowed in WHERE URL: https://github.com/apache/spark/pull/24209#issuecomment-476924219 I think we should verify it in the analyzer stage. The plan integrity verification is just for ensuring the optimizer rules do not change the plan and then break the integrity. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #24223: [SPARK-27278][SQL] Optimize GetMapValue when the map is a foldable and the key is not
maropu commented on a change in pull request #24223: [SPARK-27278][SQL] Optimize GetMapValue when the map is a foldable and the key is not URL: https://github.com/apache/spark/pull/24223#discussion_r269375161 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala ## @@ -59,6 +63,13 @@ object SimplifyExtractValueOps extends Rule[LogicalPlan] { Literal(null, ga.dataType) } case GetMapValue(CreateMap(elems), key) => CaseKeyWhen(key, elems) + case GetMapValue(Literal(map: MapData, MapType(kt, vt, _)), key) => Review comment: Since the constant folding covers foldable cases, we need to check if key is unfoldable? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on issue #24220: [SPARK-27288][SQL] Pruning nested field in complex map key from object serializers
viirya commented on issue #24220: [SPARK-27288][SQL] Pruning nested field in complex map key from object serializers URL: https://github.com/apache/spark/pull/24220#issuecomment-476920990 @dongjoon-hyun Thanks! Created a new JIRA for this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangjiaochun commented on a change in pull request #24080: [SPARK-27147][TEST]Create new unit test cases for SortShuffleWriter
wangjiaochun commented on a change in pull request #24080: [SPARK-27147][TEST]Create new unit test cases for SortShuffleWriter URL: https://github.com/apache/spark/pull/24080#discussion_r269371993 ## File path: core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import org.mockito.Mockito._ +import org.mockito.MockitoAnnotations +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{ShuffleDependency, _} +import org.apache.spark.memory.MemoryTestingUtils +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver} +import org.apache.spark.util.Utils + +class SortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfterEach { + + private val shuffleId = 0 + private val numMaps = 5 + private val conf: SparkConf = new SparkConf(loadDefaults = false) + private val sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) + private var shuffleHandle: BaseShuffleHandle[Int, Int, Int] = _ + private val shuffleBlockResolver = new IndexShuffleBlockResolver(conf) + private val serializer = new JavaSerializer(conf) + private val context = MemoryTestingUtils.fakeTaskContext(sc.env) + + override def beforeEach(): Unit = { +super.beforeEach() +MockitoAnnotations.initMocks(this) +val partitioner = new Partitioner() { + def numPartitions = numMaps + def getPartition(key: Any) = Utils.nonNegativeMod(key.hashCode, numPartitions) +} +shuffleHandle = { + val dependency = mock(classOf[ShuffleDependency[Int, Int, Int]]) + when(dependency.partitioner).thenReturn(partitioner) + when(dependency.serializer).thenReturn(serializer) + when(dependency.aggregator).thenReturn(None) + when(dependency.keyOrdering).thenReturn(None) + new BaseShuffleHandle(shuffleId, numMaps = numMaps, dependency) +} + } + + override def afterEach(): Unit = { Review comment: the shuffleBlockResolver.stop() is actually an empty method and do nothing. But I think it's no need to call stop. Unit test IndexShuffleBlockResolverSuite also not call stop. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #24195: [SPARK-25496][SQL] Deprecate from_utc_timestamp and to_utc_timestamp
HyukjinKwon commented on a change in pull request #24195: [SPARK-25496][SQL] Deprecate from_utc_timestamp and to_utc_timestamp URL: https://github.com/apache/spark/pull/24195#discussion_r269368914 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala ## @@ -1231,6 +1238,11 @@ case class MonthsBetween( case class ToUTCTimestamp(left: Expression, right: Expression) extends BinaryExpression with ImplicitCastInputTypes { + if (!SQLConf.get.utcTimestampFuncEnabled) { +throw new AnalysisException(s"The $prettyName function has been disabled since Spark 3.0. " + + s"Set ${SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key} to true to enable this function.") + } Review comment: @cloud-fan, I don't think this is a right way to deprecate. We should then at least rather throw a warning, and or we should better have a better mechanism to deprecate it in SQL side. Do we really want to add every configuration for every deprecation? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #24195: [SPARK-25496][SQL] Deprecate from_utc_timestamp and to_utc_timestamp
HyukjinKwon commented on a change in pull request #24195: [SPARK-25496][SQL] Deprecate from_utc_timestamp and to_utc_timestamp URL: https://github.com/apache/spark/pull/24195#discussion_r269368585 ## File path: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ## @@ -3017,6 +3019,7 @@ object functions { * @group datetime_funcs * @since 1.5.0 */ + @deprecated("This function is deprecated and will be removed in future versions.", "3.0.0") Review comment: BTW, seems like we should deprecate in Python side as well (see https://github.com/apache/spark/commit/d9798c834f3fed060cfd18a8d38c398cb2efcc82 for examples), and R side as well (see https://github.com/apache/spark/commit/0025a8397f8723011917239fe47518457d4d6860#diff-d97f9adc2dcac0703568c799ff106987R352 for instance) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on issue #24119: [SPARK-27182][SQL] Move the conflict source code of the sql/core module to sql/core/v1.2.1
wangyum commented on issue #24119: [SPARK-27182][SQL] Move the conflict source code of the sql/core module to sql/core/v1.2.1 URL: https://github.com/apache/spark/pull/24119#issuecomment-476907659 @liancheng Yes. It's a subset of #23788. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on issue #24215: [SPARK-27229][SQL] GroupBy Placement in Intersect Distinct
maropu commented on issue #24215: [SPARK-27229][SQL] GroupBy Placement in Intersect Distinct URL: https://github.com/apache/spark/pull/24215#issuecomment-476907430 Yea, I think so. How about checking `#distinctRowCount / rowCount` before optimization? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests
dongjoon-hyun commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests URL: https://github.com/apache/spark/pull/23514#issuecomment-476905635 :D Great! Please merge this if this is finally done, @shaneknapp ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] liancheng commented on issue #24119: [SPARK-27182][SQL] Move the conflict source code of the sql/core module to sql/core/v1.2.1
liancheng commented on issue #24119: [SPARK-27182][SQL] Move the conflict source code of the sql/core module to sql/core/v1.2.1 URL: https://github.com/apache/spark/pull/24119#issuecomment-476904194 Hey @wangyum, sorry for the long delay. IIUC, this PR is basically a subset of #23788. Once this one is merged, we can simplify #23788 by removing changes made in this PR, right? In that case, this one LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wypoon commented on a change in pull request #23767: [SPARK-26329][CORE] Faster polling of executor memory metrics.
wypoon commented on a change in pull request #23767: [SPARK-26329][CORE] Faster polling of executor memory metrics. URL: https://github.com/apache/spark/pull/23767#discussion_r269357625 ## File path: core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import java.lang.Long.{MAX_VALUE => LONG_MAX_VALUE} +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import java.util.concurrent.atomic.{AtomicLong, AtomicLongArray} + +import scala.collection.mutable.HashMap + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryManager +import org.apache.spark.metrics.ExecutorMetricType +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * :: DeveloperApi :: + * A class that polls executor metrics, and tracks their peaks per task and per stage. + * Each executor keeps an instance of this class. + * + * @param memoryManager the memory manager used by the executor. + * @param pollingInterval the polling interval in milliseconds. + */ +@DeveloperApi +private[spark] class ExecutorMetricsPoller( +memoryManager: MemoryManager, +pollingInterval: Long) + extends Logging { + + type StageKey = (Int, Int) + // tuple for Task Count and Metric Peaks + type TCMP = (AtomicLong, AtomicLongArray) + + // Map of (stageId, stageAttemptId) to (count of running tasks, executor metric peaks) + private val stageTCMP = new ConcurrentHashMap[StageKey, TCMP] + + // Map of taskId to executor metric peaks + private val taskMetricPeaks = new ConcurrentHashMap[Long, AtomicLongArray] + + private val poller = +if (pollingInterval > 0) { + ThreadUtils.newDaemonSingleThreadScheduledExecutor("executor-poller") +} else { + null +} + + /** + * Function to poll executor metrics. + * On start, if pollingInterval is positive, this is scheduled to run at that interval. + * Otherwise, this is called by the reportHeartBeat function defined in Executor and passed + * to its Heartbeater. + */ + def poll(): Unit = { +// Note: Task runner threads may update stageTCMP or read from taskMetricPeaks concurrently +// with this function via calls to methods of this class. + +// get the latest values for the metrics +val latestMetrics = ExecutorMetrics.getCurrentMetrics(memoryManager) + +def compareAndUpdate(current: Long, latest: Long): Long = + if (latest > current) latest else current + +def updatePeaks(metrics: AtomicLongArray): Unit = { + (0 until metrics.length).foreach { i => +metrics.getAndAccumulate(i, latestMetrics(i), compareAndUpdate) + } +} + +// for each active stage, update the peaks +stageTCMP.forEachValue(LONG_MAX_VALUE, v => updatePeaks(v._2)) + +// for each running task, update the peaks +taskMetricPeaks.forEachValue(LONG_MAX_VALUE, updatePeaks) + } + + /** Starts the polling thread. */ + def start(): Unit = { +if (poller != null) { + val pollingTask = new Runnable() { +override def run(): Unit = Utils.logUncaughtExceptions(poll()) + } + poller.scheduleAtFixedRate(pollingTask, 0L, pollingInterval, TimeUnit.MILLISECONDS) +} + } + + /** + * Called by Executor#launchTask. + * + * @param taskId the id of the task being launched. + */ + def onTaskLaunch(taskId: Long): Unit = { Review comment: Combined with onTaskStart. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rxin commented on issue #24215: [SPARK-27229][SQL] GroupBy Placement in Intersect Distinct
rxin commented on issue #24215: [SPARK-27229][SQL] GroupBy Placement in Intersect Distinct URL: https://github.com/apache/spark/pull/24215#issuecomment-476895292 Could this lead to performance regressions if the group by doesn't have reduction? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skonto commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests
skonto commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests URL: https://github.com/apache/spark/pull/23514#issuecomment-476894285 @shaneknapp looks stable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wypoon commented on a change in pull request #23767: [SPARK-26329][CORE] Faster polling of executor memory metrics.
wypoon commented on a change in pull request #23767: [SPARK-26329][CORE] Faster polling of executor memory metrics. URL: https://github.com/apache/spark/pull/23767#discussion_r269352335 ## File path: core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import java.lang.Long.{MAX_VALUE => LONG_MAX_VALUE} +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import java.util.concurrent.atomic.{AtomicLong, AtomicLongArray} + +import scala.collection.mutable.HashMap + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryManager +import org.apache.spark.metrics.ExecutorMetricType +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * :: DeveloperApi :: + * A class that polls executor metrics, and tracks their peaks per task and per stage. + * Each executor keeps an instance of this class. + * + * @param memoryManager the memory manager used by the executor. + * @param pollingInterval the polling interval in milliseconds. + */ +@DeveloperApi +private[spark] class ExecutorMetricsPoller( +memoryManager: MemoryManager, +pollingInterval: Long) + extends Logging { + + type StageKey = (Int, Int) + // tuple for Task Count and Metric Peaks + type TCMP = (AtomicLong, AtomicLongArray) + + // Map of (stageId, stageAttemptId) to (count of running tasks, executor metric peaks) + private val stageTCMP = new ConcurrentHashMap[StageKey, TCMP] + + // Map of taskId to executor metric peaks + private val taskMetricPeaks = new ConcurrentHashMap[Long, AtomicLongArray] + + private val poller = +if (pollingInterval > 0) { + ThreadUtils.newDaemonSingleThreadScheduledExecutor("executor-poller") +} else { + null +} + + /** + * Function to poll executor metrics. + * On start, if pollingInterval is positive, this is scheduled to run at that interval. + * Otherwise, this is called by the reportHeartBeat function defined in Executor and passed + * to its Heartbeater. + */ + def poll(): Unit = { +// Note: Task runner threads may update stageTCMP or read from taskMetricPeaks concurrently +// with this function via calls to methods of this class. + +// get the latest values for the metrics +val latestMetrics = ExecutorMetrics.getCurrentMetrics(memoryManager) + +def compareAndUpdate(current: Long, latest: Long): Long = + if (latest > current) latest else current + +def updatePeaks(metrics: AtomicLongArray): Unit = { + (0 until metrics.length).foreach { i => +metrics.getAndAccumulate(i, latestMetrics(i), compareAndUpdate) + } +} + +// for each active stage, update the peaks +stageTCMP.forEachValue(LONG_MAX_VALUE, v => updatePeaks(v._2)) + +// for each running task, update the peaks +taskMetricPeaks.forEachValue(LONG_MAX_VALUE, updatePeaks) + } + + /** Starts the polling thread. */ + def start(): Unit = { +if (poller != null) { + val pollingTask = new Runnable() { +override def run(): Unit = Utils.logUncaughtExceptions(poll()) + } + poller.scheduleAtFixedRate(pollingTask, 0L, pollingInterval, TimeUnit.MILLISECONDS) +} + } + + /** + * Called by Executor#launchTask. + * + * @param taskId the id of the task being launched. + */ + def onTaskLaunch(taskId: Long): Unit = { +taskMetricPeaks.put(taskId, new AtomicLongArray(ExecutorMetricType.numMetrics)) + } + + /** + * Called by TaskRunner#run. + * + * @param stageId the id of the stage the task belongs to. + * @param stageAttemptId the attempt number of the stage the task belongs to. + */ + def onTaskStart(stageId: Int, stageAttemptId: Int): Unit = { +// Put a new entry in stageTCMP for the stage if there isn't one already. +// Increment the task count. +val (count, _) = stageTCMP.computeIfAbsent((stageId, stageAttemptId), + _ => (new AtomicLong(0), new AtomicLongArray(ExecutorMetricType.numMetrics))) +val stageCount = count.incrementAndGet() +logDebug(s"stageTCMP: ($stageId, $stageAttemptId) ->
[GitHub] [spark] AmplabJenkins removed a comment on issue #23514: [SPARK-24902][K8s] Add PV integration tests
AmplabJenkins removed a comment on issue #23514: [SPARK-24902][K8s] Add PV integration tests URL: https://github.com/apache/spark/pull/23514#issuecomment-476891741 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on issue #23514: [SPARK-24902][K8s] Add PV integration tests
AmplabJenkins removed a comment on issue #23514: [SPARK-24902][K8s] Add PV integration tests URL: https://github.com/apache/spark/pull/23514#issuecomment-476891749 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/9350/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests
AmplabJenkins commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests URL: https://github.com/apache/spark/pull/23514#issuecomment-476891749 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/9350/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests
SparkQA commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests URL: https://github.com/apache/spark/pull/23514#issuecomment-476891729 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-make-spark-distribution-unified/9350/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests
AmplabJenkins commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests URL: https://github.com/apache/spark/pull/23514#issuecomment-476891741 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on issue #24225: [WIP][SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader
AmplabJenkins removed a comment on issue #24225: [WIP][SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader URL: https://github.com/apache/spark/pull/24225#issuecomment-476890580 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/103989/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #24225: [WIP][SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader
AmplabJenkins commented on issue #24225: [WIP][SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader URL: https://github.com/apache/spark/pull/24225#issuecomment-476890580 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/103989/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wypoon commented on a change in pull request #23767: [SPARK-26329][CORE] Faster polling of executor memory metrics.
wypoon commented on a change in pull request #23767: [SPARK-26329][CORE] Faster polling of executor memory metrics. URL: https://github.com/apache/spark/pull/23767#discussion_r269352335 ## File path: core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import java.lang.Long.{MAX_VALUE => LONG_MAX_VALUE} +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import java.util.concurrent.atomic.{AtomicLong, AtomicLongArray} + +import scala.collection.mutable.HashMap + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryManager +import org.apache.spark.metrics.ExecutorMetricType +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * :: DeveloperApi :: + * A class that polls executor metrics, and tracks their peaks per task and per stage. + * Each executor keeps an instance of this class. + * + * @param memoryManager the memory manager used by the executor. + * @param pollingInterval the polling interval in milliseconds. + */ +@DeveloperApi +private[spark] class ExecutorMetricsPoller( +memoryManager: MemoryManager, +pollingInterval: Long) + extends Logging { + + type StageKey = (Int, Int) + // tuple for Task Count and Metric Peaks + type TCMP = (AtomicLong, AtomicLongArray) + + // Map of (stageId, stageAttemptId) to (count of running tasks, executor metric peaks) + private val stageTCMP = new ConcurrentHashMap[StageKey, TCMP] + + // Map of taskId to executor metric peaks + private val taskMetricPeaks = new ConcurrentHashMap[Long, AtomicLongArray] + + private val poller = +if (pollingInterval > 0) { + ThreadUtils.newDaemonSingleThreadScheduledExecutor("executor-poller") +} else { + null +} + + /** + * Function to poll executor metrics. + * On start, if pollingInterval is positive, this is scheduled to run at that interval. + * Otherwise, this is called by the reportHeartBeat function defined in Executor and passed + * to its Heartbeater. + */ + def poll(): Unit = { +// Note: Task runner threads may update stageTCMP or read from taskMetricPeaks concurrently +// with this function via calls to methods of this class. + +// get the latest values for the metrics +val latestMetrics = ExecutorMetrics.getCurrentMetrics(memoryManager) + +def compareAndUpdate(current: Long, latest: Long): Long = + if (latest > current) latest else current + +def updatePeaks(metrics: AtomicLongArray): Unit = { + (0 until metrics.length).foreach { i => +metrics.getAndAccumulate(i, latestMetrics(i), compareAndUpdate) + } +} + +// for each active stage, update the peaks +stageTCMP.forEachValue(LONG_MAX_VALUE, v => updatePeaks(v._2)) + +// for each running task, update the peaks +taskMetricPeaks.forEachValue(LONG_MAX_VALUE, updatePeaks) + } + + /** Starts the polling thread. */ + def start(): Unit = { +if (poller != null) { + val pollingTask = new Runnable() { +override def run(): Unit = Utils.logUncaughtExceptions(poll()) + } + poller.scheduleAtFixedRate(pollingTask, 0L, pollingInterval, TimeUnit.MILLISECONDS) +} + } + + /** + * Called by Executor#launchTask. + * + * @param taskId the id of the task being launched. + */ + def onTaskLaunch(taskId: Long): Unit = { +taskMetricPeaks.put(taskId, new AtomicLongArray(ExecutorMetricType.numMetrics)) + } + + /** + * Called by TaskRunner#run. + * + * @param stageId the id of the stage the task belongs to. + * @param stageAttemptId the attempt number of the stage the task belongs to. + */ + def onTaskStart(stageId: Int, stageAttemptId: Int): Unit = { +// Put a new entry in stageTCMP for the stage if there isn't one already. +// Increment the task count. +val (count, _) = stageTCMP.computeIfAbsent((stageId, stageAttemptId), + _ => (new AtomicLong(0), new AtomicLongArray(ExecutorMetricType.numMetrics))) +val stageCount = count.incrementAndGet() +logDebug(s"stageTCMP: ($stageId, $stageAttemptId) ->
[GitHub] [spark] AmplabJenkins removed a comment on issue #24136: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor
AmplabJenkins removed a comment on issue #24136: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor URL: https://github.com/apache/spark/pull/24136#issuecomment-476890393 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on issue #24136: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor
AmplabJenkins removed a comment on issue #24136: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor URL: https://github.com/apache/spark/pull/24136#issuecomment-476890397 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/103991/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on issue #24225: [WIP][SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader
AmplabJenkins removed a comment on issue #24225: [WIP][SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader URL: https://github.com/apache/spark/pull/24225#issuecomment-476890577 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #24225: [WIP][SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader
AmplabJenkins commented on issue #24225: [WIP][SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader URL: https://github.com/apache/spark/pull/24225#issuecomment-476890577 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #24136: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor
AmplabJenkins commented on issue #24136: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor URL: https://github.com/apache/spark/pull/24136#issuecomment-476890397 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/103991/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #24136: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor
AmplabJenkins commented on issue #24136: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor URL: https://github.com/apache/spark/pull/24136#issuecomment-476890393 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on issue #24136: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor
SparkQA removed a comment on issue #24136: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor URL: https://github.com/apache/spark/pull/24136#issuecomment-476798531 **[Test build #103991 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/103991/testReport)** for PR 24136 at commit [`59b45d1`](https://github.com/apache/spark/commit/59b45d12e5c9662e15a626be803635413d8c18dd). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on issue #24225: [WIP][SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader
SparkQA removed a comment on issue #24225: [WIP][SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader URL: https://github.com/apache/spark/pull/24225#issuecomment-476794844 **[Test build #103989 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/103989/testReport)** for PR 24225 at commit [`75387c8`](https://github.com/apache/spark/commit/75387c87134839709b5f0aad2eb7154d5844bfb0). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on issue #24225: [WIP][SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader
SparkQA commented on issue #24225: [WIP][SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader URL: https://github.com/apache/spark/pull/24225#issuecomment-476889909 **[Test build #103989 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/103989/testReport)** for PR 24225 at commit [`75387c8`](https://github.com/apache/spark/commit/75387c87134839709b5f0aad2eb7154d5844bfb0). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on issue #24136: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor
SparkQA commented on issue #24136: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor URL: https://github.com/apache/spark/pull/24136#issuecomment-476889725 **[Test build #103991 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/103991/testReport)** for PR 24136 at commit [`59b45d1`](https://github.com/apache/spark/commit/59b45d12e5c9662e15a626be803635413d8c18dd). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on issue #23514: [SPARK-24902][K8s] Add PV integration tests
SparkQA removed a comment on issue #23514: [SPARK-24902][K8s] Add PV integration tests URL: https://github.com/apache/spark/pull/23514#issuecomment-476884691 **[Test build #104000 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/104000/testReport)** for PR 23514 at commit [`6cfb751`](https://github.com/apache/spark/commit/6cfb75156accebaf40fe11d686b180457732e0a9). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests
AmplabJenkins commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests URL: https://github.com/apache/spark/pull/23514#issuecomment-476888613 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/104000/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on issue #23514: [SPARK-24902][K8s] Add PV integration tests
AmplabJenkins removed a comment on issue #23514: [SPARK-24902][K8s] Add PV integration tests URL: https://github.com/apache/spark/pull/23514#issuecomment-476888613 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/104000/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on issue #23514: [SPARK-24902][K8s] Add PV integration tests
AmplabJenkins removed a comment on issue #23514: [SPARK-24902][K8s] Add PV integration tests URL: https://github.com/apache/spark/pull/23514#issuecomment-476888611 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests
AmplabJenkins commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests URL: https://github.com/apache/spark/pull/23514#issuecomment-476888611 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests
SparkQA commented on issue #23514: [SPARK-24902][K8s] Add PV integration tests URL: https://github.com/apache/spark/pull/23514#issuecomment-476888529 **[Test build #104000 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/104000/testReport)** for PR 23514 at commit [`6cfb751`](https://github.com/apache/spark/commit/6cfb75156accebaf40fe11d686b180457732e0a9). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on issue #24208: [SPARK-27272][CORE] Enable blacklisting of node/executor on fetch failures by default
AmplabJenkins removed a comment on issue #24208: [SPARK-27272][CORE] Enable blacklisting of node/executor on fetch failures by default URL: https://github.com/apache/spark/pull/24208#issuecomment-476888072 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/103987/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on issue #24208: [SPARK-27272][CORE] Enable blacklisting of node/executor on fetch failures by default
AmplabJenkins commented on issue #24208: [SPARK-27272][CORE] Enable blacklisting of node/executor on fetch failures by default URL: https://github.com/apache/spark/pull/24208#issuecomment-476888067 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dilipbiswal commented on issue #24224: [SPARK-27285] Support describing output of CTE
dilipbiswal commented on issue #24224: [SPARK-27285] Support describing output of CTE URL: https://github.com/apache/spark/pull/24224#issuecomment-476888202 Thank you very much @cloud-fan This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org