[jira] [Commented] (SPARK-27465) Kafka Client 0.11.0.0 is not Supporting the kafkatestutils package
[ https://issues.apache.org/jira/browse/SPARK-27465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818658#comment-16818658 ] shahid commented on SPARK-27465: I will analyze the issue. > Kafka Client 0.11.0.0 is not Supporting the kafkatestutils package > -- > > Key: SPARK-27465 > URL: https://issues.apache.org/jira/browse/SPARK-27465 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.4.0, 2.4.1 >Reporter: Praveen >Priority: Critical > > Hi Team, > We are getting the below exceptions with Kafka Client Version 0.11.0.0 for > KafkaTestUtils Package. But its working fine when we use the Kafka Client > Version 0.10.0.1. Please suggest the way forwards. We are using the package " > import org.apache.spark.streaming.kafka010.KafkaTestUtils;" > > ERROR: > java.lang.NoSuchMethodError: > kafka.server.KafkaServer$.$lessinit$greater$default$2()Lkafka/utils/Time; > at > org.apache.spark.streaming.kafka010.KafkaTestUtils$$anonfun$setupEmbeddedKafkaServer$2.apply(KafkaTestUtils.scala:110) > at > org.apache.spark.streaming.kafka010.KafkaTestUtils$$anonfun$setupEmbeddedKafkaServer$2.apply(KafkaTestUtils.scala:107) > at > org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:2234) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:2226) > at > org.apache.spark.streaming.kafka010.KafkaTestUtils.setupEmbeddedKafkaServer(KafkaTestUtils.scala:107) > at > org.apache.spark.streaming.kafka010.KafkaTestUtils.setup(KafkaTestUtils.scala:122) > at > com.netcracker.rms.smart.esp.ESPTestEnv.prepareKafkaTestUtils(ESPTestEnv.java:203) > at com.netcracker.rms.smart.esp.ESPTestEnv.setUp(ESPTestEnv.java:157) > at > com.netcracker.rms.smart.esp.TestEventStreamProcessor.setUp(TestEventStreamProcessor.java:58) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27468) "Storage Level" in "RDD Storage Page" is not correct
[ https://issues.apache.org/jira/browse/SPARK-27468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818662#comment-16818662 ] shahid commented on SPARK-27468: I would like to analyze the issue. > "Storage Level" in "RDD Storage Page" is not correct > > > Key: SPARK-27468 > URL: https://issues.apache.org/jira/browse/SPARK-27468 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.1 >Reporter: Shixiong Zhu >Priority: Major > > I ran the following unit test and checked the UI. > {code} > val conf = new SparkConf() > .setAppName("test") > .setMaster("local-cluster[2,1,1024]") > .set("spark.ui.enabled", "true") > sc = new SparkContext(conf) > val rdd = sc.makeRDD(1 to 10, 1).persist(StorageLevel.MEMORY_ONLY_2) > rdd.count() > Thread.sleep(360) > {code} > The storage level is "Memory Deserialized 1x Replicated" in the RDD storage > page. > I tried to debug and found this is because Spark emitted the following two > events: > {code} > event: SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(1, > 10.8.132.160, 65473, None),rdd_0_0,StorageLevel(memory, deserialized, 2 > replicas),56,0)) > event: SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(0, > 10.8.132.160, 65474, None),rdd_0_0,StorageLevel(memory, deserialized, 1 > replicas),56,0)) > {code} > The storage level in the second event will overwrite the first one. "1 > replicas" comes from this line: > https://github.com/apache/spark/blob/3ab96d7acf870e53c9016b0b63d0b328eec23bed/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1457 > Maybe AppStatusListener should calculate the replicas from events? > Another fact we may need to think about is when replicas is 2, will two Spark > events arrive in the same order? Currently, two RPCs from different executors > can arrive in any order. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support
[ https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818649#comment-16818649 ] Bryan Cutler commented on SPARK-27396: -- Thanks for this [~revans2], overall I think the proposal sounds good and there will be some nice benefits for increasing support for columnar processing in Spark. I saw you mentioned Pandas UDF a couple times, but does this SPIP also cover the related changes for the PySpark API? Some things might not translate exactly from the Scala APIs, like what does a {{RDD[ColumnarBatch]}} mean in Python? I also have some concerns about the APIs for goal #3, data transfers to DL/ML frameworks. This is essentially, SPARK-24579 IIUC. My concern is that creating a {{columnar_udf}} might not be the best interface for this. First off, if done in PySpark then the user would provide a Python function which means all the data must be sent to the Python worker process before it is sent elsewhere. This prevents the user from doing a more optimal data exchange that might go directly from the Spark JVM to another framework, like a TensorFlow C++ kernel, skipping Python entirely. Secondly, I'm not sure that the average user will take to working with low-level {{ColumnarBatches}} in a UDF. Even if the data is in Arrow form, there are a lot of differences between the Arrow Java and Python implementations which could be confusing for the end user. I think something like a plugin interface, where specialized connectors would handle the low-level transfer and could be invoked the same in Python or Java/Scala, might be better in the long run. Having a connector which executes a UDF would still be useful for advanced users. I don't know if this is out of scope for the SPIP, but I wouldn't want us to get stuck with a {{columnar_udf}} api that is limited and not user friendly. > SPIP: Public APIs for extended Columnar Processing Support > -- > > Key: SPARK-27396 > URL: https://issues.apache.org/jira/browse/SPARK-27396 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Robert Joseph Evans >Priority: Major > > *Q1.* What are you trying to do? Articulate your objectives using absolutely > no jargon. > > The Dataset/DataFrame API in Spark currently only exposes to users one row at > a time when processing data. The goals of this are to > > # Expose to end users a new option of processing the data in a columnar > format, multiple rows at a time, with the data organized into contiguous > arrays in memory. > # Make any transitions between the columnar memory layout and a row based > layout transparent to the end user. > # Allow for simple data exchange with other systems, DL/ML libraries, > pandas, etc. by having clean APIs to transform the columnar data into an > Apache Arrow compatible layout. > # Provide a plugin mechanism for columnar processing support so an advanced > user could avoid data transition between columnar and row based processing > even through shuffles. This means we should at least support pluggable APIs > so an advanced end user can implement the columnar partitioning themselves, > and provide the glue necessary to shuffle the data still in a columnar format. > # Expose new APIs that allow advanced users or frameworks to implement > columnar processing either as UDFs, or by adjusting the physical plan to do > columnar processing. If the latter is too controversial we can move it to > another SPIP, but we plan to implement some accelerated computing in parallel > with this feature to be sure the APIs work, and without this feature it makes > that impossible. > > Not Requirements, but things that would be nice to have. > # Provide default implementations for partitioning columnar data, so users > don’t have to. > # Transition the existing in memory columnar layouts to be compatible with > Apache Arrow. This would make the transformations to Apache Arrow format a > no-op. The existing formats are already very close to those layouts in many > cases. This would not be using the Apache Arrow java library, but instead > being compatible with the memory > [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a > subset of that layout. > # Provide a clean transition from the existing code to the new one. The > existing APIs which are public but evolving are not that far off from what is > being proposed. We should be able to create a new parallel API that can wrap > the existing one. This means any file format that is trying to support > columnar can still do so until we make a conscious decision to deprecate and > then turn off the old APIs. > > *Q2.* What problem is this proposal NOT designed to solve? >
[jira] [Commented] (SPARK-25348) Data source for binary files
[ https://issues.apache.org/jira/browse/SPARK-25348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818640#comment-16818640 ] Xiangrui Meng commented on SPARK-25348: --- I created two follow-up tasks: * DocumentationL SPARK-27472 * Filter push down: SPARK-27473 > Data source for binary files > > > Key: SPARK-25348 > URL: https://issues.apache.org/jira/browse/SPARK-25348 > Project: Spark > Issue Type: Story > Components: SQL >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Weichen Xu >Priority: Major > > It would be useful to have a data source implementation for binary files, > which can be used to build features to load images, audio, and videos. > Microsoft has an implementation at > [https://github.com/Azure/mmlspark/tree/master/src/io/binary.] It would be > great if we can merge it into Spark main repo. > cc: [~mhamilton] and [~imatiach] > Proposed API: > Format name: "binaryFile" > Schema: > * content: BinaryType > * status (following Hadoop FIleStatus): > ** path: StringType > ** modificationTime: Timestamp > ** length: LongType (size limit 2GB) > Options: > * pathGlobFilter: only include files with path matching the glob pattern > Input partition size can be controlled by common SQL confs: maxPartitionBytes > and openCostInBytes -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-25348) Data source for binary files
[ https://issues.apache.org/jira/browse/SPARK-25348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818640#comment-16818640 ] Xiangrui Meng edited comment on SPARK-25348 at 4/16/19 5:19 AM: I created two follow-up tasks: * Documentation: SPARK-27472 * Filter push down: SPARK-27473 was (Author: mengxr): I created two follow-up tasks: * DocumentationL SPARK-27472 * Filter push down: SPARK-27473 > Data source for binary files > > > Key: SPARK-25348 > URL: https://issues.apache.org/jira/browse/SPARK-25348 > Project: Spark > Issue Type: Story > Components: SQL >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Weichen Xu >Priority: Major > > It would be useful to have a data source implementation for binary files, > which can be used to build features to load images, audio, and videos. > Microsoft has an implementation at > [https://github.com/Azure/mmlspark/tree/master/src/io/binary.] It would be > great if we can merge it into Spark main repo. > cc: [~mhamilton] and [~imatiach] > Proposed API: > Format name: "binaryFile" > Schema: > * content: BinaryType > * status (following Hadoop FIleStatus): > ** path: StringType > ** modificationTime: Timestamp > ** length: LongType (size limit 2GB) > Options: > * pathGlobFilter: only include files with path matching the glob pattern > Input partition size can be controlled by common SQL confs: maxPartitionBytes > and openCostInBytes -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27473) Support filter push down for status fields in binary file data source
Xiangrui Meng created SPARK-27473: - Summary: Support filter push down for status fields in binary file data source Key: SPARK-27473 URL: https://issues.apache.org/jira/browse/SPARK-27473 Project: Spark Issue Type: Documentation Components: SQL Affects Versions: 3.0.0 Reporter: Xiangrui Meng As a user, I can use `spark.read.format("binaryFile").load(path).filter($"status.lenght" < 1L)` to load files that are less than 1e8 bytes. Spark shouldn't even read files that are bigger than 1e8 bytes in this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25348) Data source for binary files
[ https://issues.apache.org/jira/browse/SPARK-25348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiangrui Meng updated SPARK-25348: -- Component/s: (was: ML) > Data source for binary files > > > Key: SPARK-25348 > URL: https://issues.apache.org/jira/browse/SPARK-25348 > Project: Spark > Issue Type: Story > Components: SQL >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Weichen Xu >Priority: Major > > It would be useful to have a data source implementation for binary files, > which can be used to build features to load images, audio, and videos. > Microsoft has an implementation at > [https://github.com/Azure/mmlspark/tree/master/src/io/binary.] It would be > great if we can merge it into Spark main repo. > cc: [~mhamilton] and [~imatiach] > Proposed API: > Format name: "binaryFile" > Schema: > * content: BinaryType > * status (following Hadoop FIleStatus): > ** path: StringType > ** modificationTime: Timestamp > ** length: LongType (size limit 2GB) > Options: > * pathGlobFilter: only include files with path matching the glob pattern > Input partition size can be controlled by common SQL confs: maxPartitionBytes > and openCostInBytes -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27472) Docuement binary file data source in Spark user guide
Xiangrui Meng created SPARK-27472: - Summary: Docuement binary file data source in Spark user guide Key: SPARK-27472 URL: https://issues.apache.org/jira/browse/SPARK-27472 Project: Spark Issue Type: Documentation Components: Documentation, SQL Affects Versions: 3.0.0 Reporter: Xiangrui Meng We should add binary file data source to https://spark.apache.org/docs/latest/sql-data-sources.html after SPARK-25348. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25348) Data source for binary files
[ https://issues.apache.org/jira/browse/SPARK-25348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiangrui Meng updated SPARK-25348: -- Description: It would be useful to have a data source implementation for binary files, which can be used to build features to load images, audio, and videos. Microsoft has an implementation at [https://github.com/Azure/mmlspark/tree/master/src/io/binary.] It would be great if we can merge it into Spark main repo. cc: [~mhamilton] and [~imatiach] Proposed API: Format name: "binaryFile" Schema: * content: BinaryType * status (following Hadoop FIleStatus): ** path: StringType ** modificationTime: Timestamp ** length: LongType (size limit 2GB) Options: * pathGlobFilter: only include files with path matching the glob pattern Input partition size can be controlled by common SQL confs: maxPartitionBytes and openCostInBytes was: It would be useful to have a data source implementation for binary files, which can be used to build features to load images, audio, and videos. Microsoft has an implementation at [https://github.com/Azure/mmlspark/tree/master/src/io/binary.] It would be great if we can merge it into Spark main repo. cc: [~mhamilton] and [~imatiach] Proposed API: Format name: "binary-file" Schema: * content: BinaryType * status (following Hadoop FIleStatus): ** path: StringType ** modification_time: Timestamp ** length: LongType (size limit 2GB) Options: * pathFilterRegex: only include files with path matching the regex pattern * maxBytesPerPartition: The max total file size for each partition unless the partition only contains one file We will also add `binaryFile` to `DataFrameReader` and `DataStreamReader` as convenience aliases. > Data source for binary files > > > Key: SPARK-25348 > URL: https://issues.apache.org/jira/browse/SPARK-25348 > Project: Spark > Issue Type: Story > Components: ML, SQL >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Weichen Xu >Priority: Major > > It would be useful to have a data source implementation for binary files, > which can be used to build features to load images, audio, and videos. > Microsoft has an implementation at > [https://github.com/Azure/mmlspark/tree/master/src/io/binary.] It would be > great if we can merge it into Spark main repo. > cc: [~mhamilton] and [~imatiach] > Proposed API: > Format name: "binaryFile" > Schema: > * content: BinaryType > * status (following Hadoop FIleStatus): > ** path: StringType > ** modificationTime: Timestamp > ** length: LongType (size limit 2GB) > Options: > * pathGlobFilter: only include files with path matching the glob pattern > Input partition size can be controlled by common SQL confs: maxPartitionBytes > and openCostInBytes -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27465) Kafka Client 0.11.0.0 is not Supporting the kafkatestutils package
[ https://issues.apache.org/jira/browse/SPARK-27465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Praveen updated SPARK-27465: Affects Version/s: 2.3.0 2.3.1 2.3.2 2.4.0 2.4.1 > Kafka Client 0.11.0.0 is not Supporting the kafkatestutils package > -- > > Key: SPARK-27465 > URL: https://issues.apache.org/jira/browse/SPARK-27465 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.4.0, 2.4.1 >Reporter: Praveen >Priority: Critical > > Hi Team, > We are getting the below exceptions with Kafka Client Version 0.11.0.0 for > KafkaTestUtils Package. But its working fine when we use the Kafka Client > Version 0.10.0.1. Please suggest the way forwards. We are using the package " > import org.apache.spark.streaming.kafka010.KafkaTestUtils;" > > ERROR: > java.lang.NoSuchMethodError: > kafka.server.KafkaServer$.$lessinit$greater$default$2()Lkafka/utils/Time; > at > org.apache.spark.streaming.kafka010.KafkaTestUtils$$anonfun$setupEmbeddedKafkaServer$2.apply(KafkaTestUtils.scala:110) > at > org.apache.spark.streaming.kafka010.KafkaTestUtils$$anonfun$setupEmbeddedKafkaServer$2.apply(KafkaTestUtils.scala:107) > at > org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:2234) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:2226) > at > org.apache.spark.streaming.kafka010.KafkaTestUtils.setupEmbeddedKafkaServer(KafkaTestUtils.scala:107) > at > org.apache.spark.streaming.kafka010.KafkaTestUtils.setup(KafkaTestUtils.scala:122) > at > com.netcracker.rms.smart.esp.ESPTestEnv.prepareKafkaTestUtils(ESPTestEnv.java:203) > at com.netcracker.rms.smart.esp.ESPTestEnv.setUp(ESPTestEnv.java:157) > at > com.netcracker.rms.smart.esp.TestEventStreamProcessor.setUp(TestEventStreamProcessor.java:58) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27465) Kafka Client 0.11.0.0 is not Supporting the kafkatestutils package
[ https://issues.apache.org/jira/browse/SPARK-27465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Praveen updated SPARK-27465: Priority: Critical (was: Major) > Kafka Client 0.11.0.0 is not Supporting the kafkatestutils package > -- > > Key: SPARK-27465 > URL: https://issues.apache.org/jira/browse/SPARK-27465 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 2.3.3 >Reporter: Praveen >Priority: Critical > > Hi Team, > We are getting the below exceptions with Kafka Client Version 0.11.0.0 for > KafkaTestUtils Package. But its working fine when we use the Kafka Client > Version 0.10.0.1. Please suggest the way forwards. We are using the package " > import org.apache.spark.streaming.kafka010.KafkaTestUtils;" > > ERROR: > java.lang.NoSuchMethodError: > kafka.server.KafkaServer$.$lessinit$greater$default$2()Lkafka/utils/Time; > at > org.apache.spark.streaming.kafka010.KafkaTestUtils$$anonfun$setupEmbeddedKafkaServer$2.apply(KafkaTestUtils.scala:110) > at > org.apache.spark.streaming.kafka010.KafkaTestUtils$$anonfun$setupEmbeddedKafkaServer$2.apply(KafkaTestUtils.scala:107) > at > org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:2234) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:2226) > at > org.apache.spark.streaming.kafka010.KafkaTestUtils.setupEmbeddedKafkaServer(KafkaTestUtils.scala:107) > at > org.apache.spark.streaming.kafka010.KafkaTestUtils.setup(KafkaTestUtils.scala:122) > at > com.netcracker.rms.smart.esp.ESPTestEnv.prepareKafkaTestUtils(ESPTestEnv.java:203) > at com.netcracker.rms.smart.esp.ESPTestEnv.setUp(ESPTestEnv.java:157) > at > com.netcracker.rms.smart.esp.TestEventStreamProcessor.setUp(TestEventStreamProcessor.java:58) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818598#comment-16818598 ] Kevin Zhang commented on SPARK-24630: - thanks [~Jackey Lee] So I'm wondering what's blocking the pr of this issue to be merged, is it related to DataSourceV2? > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP V2.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27436) Add spark.sql.optimizer.nonExcludedRules
[ https://issues.apache.org/jira/browse/SPARK-27436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-27436. --- Resolution: Won't Do > Add spark.sql.optimizer.nonExcludedRules > > > Key: SPARK-27436 > URL: https://issues.apache.org/jira/browse/SPARK-27436 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > > This issue aims to add `spark.sql.optimizer.nonExcludedRules` static > configuration to prevent accidental rule exclusion by users in SQL > environment dynamically. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27471) Reorganize public v2 catalog API
Ryan Blue created SPARK-27471: - Summary: Reorganize public v2 catalog API Key: SPARK-27471 URL: https://issues.apache.org/jira/browse/SPARK-27471 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.1 Reporter: Ryan Blue Fix For: 3.0.0 In the review for SPARK-27181, Reynold suggested some package moves. We've decided (at the v2 community sync) not to delay by having this discussion now because we want to get the new catalog API in so we can work on more logical plans in parallel. But we do need to make sure we have a sane package scheme for the next release. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27386) Improve partition transform parsing
[ https://issues.apache.org/jira/browse/SPARK-27386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818457#comment-16818457 ] Reynold Xin commented on SPARK-27386: - [~rdblue] when will you fix this? > Improve partition transform parsing > --- > > Key: SPARK-27386 > URL: https://issues.apache.org/jira/browse/SPARK-27386 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Ryan Blue >Priority: Major > > SPARK-27181 adds support to the SQL parser for transformation functions in > the {{PARTITION BY}} clause. The rules to match this are specific to > transforms and can match only literals or qualified names (field references). > This should be improved to match a broader set of expressions so that Spark > can produce better error messages than an expected symbol list. > For example, {{PARTITION BY (2 + 3)}} should produce "invalid transformation > expression: 2 + 3" instead of "expecting qualified name". -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27351) Wrong outputRows estimation after AggregateEstimation with only null value column
[ https://issues.apache.org/jira/browse/SPARK-27351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-27351. --- Resolution: Fixed Assignee: peng bo Fix Version/s: 3.0.0 2.4.2 This is resolved via https://github.com/apache/spark/pull/24286 > Wrong outputRows estimation after AggregateEstimation with only null value > column > - > > Key: SPARK-27351 > URL: https://issues.apache.org/jira/browse/SPARK-27351 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.1 >Reporter: peng bo >Assignee: peng bo >Priority: Major > Fix For: 2.4.2, 3.0.0 > > > The upper bound of group-by columns row number is to multiply distinct counts > of group-by columns. However, column with only null value will cause the > output row number to be 0 which is incorrect. > Ex: > col1 (distinct: 2, rowCount 2) > col2 (distinct: 0, rowCount 2) > group by col1, col2 > Actual: output rows: 0 > Expected: output rows: 2 > {code:java} > var outputRows: BigInt = agg.groupingExpressions.foldLeft(BigInt(1))( > (res, expr) => res * > childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27452) Update zstd-jni to 1.3.8-9
[ https://issues.apache.org/jira/browse/SPARK-27452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-27452: -- Summary: Update zstd-jni to 1.3.8-9 (was: Update zstd-jni to 1.3.8-7) > Update zstd-jni to 1.3.8-9 > -- > > Key: SPARK-27452 > URL: https://issues.apache.org/jira/browse/SPARK-27452 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 3.0.0 >Reporter: Dongjoon Hyun >Priority: Minor > > This issue updates `zstd-jni` from 1.3.2-2 to 1.3.8-7 to be aligned with the > latest Zstd 1.3.8 seamlessly. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27452) Update zstd-jni to 1.3.8-9
[ https://issues.apache.org/jira/browse/SPARK-27452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun reassigned SPARK-27452: - Assignee: Dongjoon Hyun > Update zstd-jni to 1.3.8-9 > -- > > Key: SPARK-27452 > URL: https://issues.apache.org/jira/browse/SPARK-27452 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 3.0.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Minor > > This issue updates `zstd-jni` from 1.3.2-2 to 1.3.8-7 to be aligned with the > latest Zstd 1.3.8 seamlessly. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27470) Upgrade pyrolite to 4.23
Sean Owen created SPARK-27470: - Summary: Upgrade pyrolite to 4.23 Key: SPARK-27470 URL: https://issues.apache.org/jira/browse/SPARK-27470 Project: Spark Issue Type: Improvement Components: PySpark, Spark Core Affects Versions: 2.4.1, 3.0.0 Reporter: Sean Owen Assignee: Sean Owen We can/should upgrade the pyrolite dependence to the latest, 4.23, to pick up bug and security fixes. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24717) Split out min retain version of state for memory in HDFSBackedStateStoreProvider
[ https://issues.apache.org/jira/browse/SPARK-24717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818326#comment-16818326 ] Stavros Kontopoulos commented on SPARK-24717: - [~tdas] What is the point of having `spark.sql.streaming.minBatchesToRetain` set to 100 by default? Wouldnt that create problems with large states when it comes to external storage? > Split out min retain version of state for memory in > HDFSBackedStateStoreProvider > > > Key: SPARK-24717 > URL: https://issues.apache.org/jira/browse/SPARK-24717 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jungtaek Lim >Assignee: Jungtaek Lim >Priority: Major > Fix For: 2.4.0 > > > HDFSBackedStateStoreProvider has only one configuration for minimum versions > to retain of state which applies to both memory cache and files. As default > version of "spark.sql.streaming.minBatchesToRetain" is set to high (100), > which doesn't require strictly 100x of memory, but I'm seeing 10x ~ 80x of > memory consumption for various workloads. In addition, in some cases, > requiring 2x of memory is even unacceptable, so we should split out > configuration for memory and let users adjust to trade-off memory usage vs > cache miss. > In normal case, default value '2' would cover both cases: success and > restoring failure with less than or around 2x of memory usage, and '1' would > only cover success case but no longer require more than 1x of memory. In > extreme case, user can set the value to '0' to completely disable the map > cache to maximize executor memory. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27463) SPIP: Support Dataframe Cogroup via Pandas UDFs
[ https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Martin updated SPARK-27463: - Description: Recent work on Pandas UDFs in Spark, has allowed for improved interoperability between Pandas and Spark. This proposal aims to extend this by introducing a new Pandas UDF type which would allow for a cogroup operation to be applied to two PySpark DataFrames. Full details are in the google document linked below. was: h2. *Background and Motivation* Recently there has been a great deal of work in PySpark to improve interoperability with the Pandas library. This work has allowed users to write User Defined Functions (UDFs) in Pandas which can then be applied to a Spark DataFrame. The benefit here is that it allows users to combine the functionality of Pandas with the parallelisation abilities of Spark. In addition, these new Pandas UDFs have significantly lower overhead than traditional UDFS as they operate on a batch of data at a time (i.e. they are vectorised) and they use Apache Arrow for serialisation between the JVM and Python processes. As of Spark 2.3 two types of Pandas UDF are offered. Scalar UDFs effectively offer a map operation at the row level, while Grouped Map UDFs allow a map operation on a group of data. This functionality has proved successful in allowing users to integrate Spark with existing Pandas workflows, however there are situations where the existing functionality offered is not sufficient. One such case is analogous to the existing Cogroup functionality available on RDDs and DataSets and was proposed by Li Jin on the Spark-Dev mailing list[1] . In this case, the user would like to group two Spark DataFrames by a common key and then apply a python function to each group. This python function would take two pandas DataFrames as its arguments and would return an arbitrary length Pandas DataFrame. To give a concrete example of the usefulness of this functionality, consider the use case of performing an as-of join between two distinct DataFrames This is something that has traditionally been very difficult to do in Spark (and indeed in SQL in general)[2] but which has good support in Pandas[3]. If Cogroup-like functionality was available in PySpark then one could simply write a Pandas function to perform the as-of joining which could then be applied to two (appropriately grouped) DataFrames. This proposal therefore advocates introducing a new API call which would allow for a Cogrouped Pandas UDF. [1][http://mail-archives.apache.org/mod_mbox/spark-dev/201902.mbox/%3ccagy9duxt569bpgp0wsc2esjgcoo5+hbfihfbkofcocclmjh...@mail.gmail.com%3e] [2]see https://issues.apache.org/jira/browse/SPARK-22947 for a SPIP that aims to add asof join functionality to Spark. [3][https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge_asof.html] h2. *API Changes* The public API changes would all be on the PySpark side. In terms of the API itself there are a couple of options depending on whether the goal is syntactic brevity or with consistency with the DataSet version of cogroup. If brevity is the aim then a new method can be added to the DataFrame class: {code:java} # other is another DataFrame, on is the cogroup key, udf is the function to apply. def cogroup(self, other, on, udf){code} Alternatively, to be consistent with the DataSet version of cogroup, a new method could be added to the GroupedData class. {code:java} # other is another GroupedData, udf is the function to apply. def cogroup(self, other, udf){code} The exact API can be worked out as part of this SPIP and the document will be updated once a decision has been reached. In addition, a new PandasUDFType, COGROUPED_MAP, will be defined to identify this new type of UDF. Functions annotated with this decorator should take two Pandas DataFrames and return a single Pandas DataFrame. Here is an example of usage, using the as-of join use case described earlier and the first option for the API syntax. {code:java} @pandas_udf(return_schema, PandasUDFType.COGROUPED_MAP) # df1, df2 and function return are all pandas.DataFrames def asof_join(df1, df2): return pd.merge_asof(df1, df2, on='time') df1.cogroup(df2, on='product_id', apply=asof_join){code} h2. *Target Personas* Data scientists, data engineers, library developers. h2. *Scope* * Initial implementation will only consider the case of Cogrouping exactly two DataFrames. Further work may extend this to the case of multiple DataFrames * API call is to be made available via PySpark only. No equivalent R/Java/Scala functionality will be offered. h2. *Design* * New UDF type, PandasUDFType.COGROUPED_MAP, to be defined in PySpark * New public method to be added to either GroupedData or DataFrame to expose cogroup in Pyspark * New package private method to be added to RelationGroupedDataset to allow
[jira] [Commented] (SPARK-25250) Race condition with tasks running when new attempt for same stage is created leads to other task in the next attempt running on the same partition id retry multiple ti
[ https://issues.apache.org/jira/browse/SPARK-25250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818318#comment-16818318 ] Imran Rashid commented on SPARK-25250: -- I agree about opening a new jira. Wenchen discussed reverting it here: https://github.com/apache/spark/pull/24359 I agree we made a major mistake in that fix. I don't care too much about how the commits look in git, I am fine with having a revert followed by a different fix, rather than rolling it into one change. > Race condition with tasks running when new attempt for same stage is created > leads to other task in the next attempt running on the same partition id > retry multiple times > -- > > Key: SPARK-25250 > URL: https://issues.apache.org/jira/browse/SPARK-25250 > Project: Spark > Issue Type: Bug > Components: Scheduler, Spark Core >Affects Versions: 2.3.1 >Reporter: Parth Gandhi >Assignee: Parth Gandhi >Priority: Major > Fix For: 2.3.4, 2.4.1, 3.0.0 > > > We recently had a scenario where a race condition occurred when a task from > previous stage attempt just finished before new attempt for the same stage > was created due to fetch failure, so the new task created in the second > attempt on the same partition id was retrying multiple times due to > TaskCommitDenied Exception without realizing that the task in earlier attempt > was already successful. > For example, consider a task with partition id 9000 and index 9000 running in > stage 4.0. We see a fetch failure so thus, we spawn a new stage attempt 4.1. > Just within this timespan, the above task completes successfully, thus, > marking the partition id 9000 as complete for 4.0. However, as stage 4.1 has > not yet been created, the taskset info for that stage is not available to the > TaskScheduler so, naturally, the partition id 9000 has not been marked > completed for 4.1. Stage 4.1 now spawns task with index 2000 on the same > partition id 9000. This task fails due to CommitDeniedException and since, it > does not see the corresponding partition id as been marked successful, it > keeps retrying multiple times until the job finally succeeds. It doesn't > cause any job failures because the DAG scheduler is tracking the partitions > separate from the task set managers. > > Steps to Reproduce: > # Run any large job involving shuffle operation. > # When the ShuffleMap stage finishes and the ResultStage begins running, > cause this stage to throw a fetch failure exception(Try deleting certain > shuffle files on any host). > # Observe the task attempt numbers for the next stage attempt. Please note > that this issue is an intermittent one, so it might not happen all the time. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27454) Spark image datasource fail when encounter some illegal images
[ https://issues.apache.org/jira/browse/SPARK-27454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiangrui Meng reassigned SPARK-27454: - Assignee: Weichen Xu > Spark image datasource fail when encounter some illegal images > -- > > Key: SPARK-27454 > URL: https://issues.apache.org/jira/browse/SPARK-27454 > Project: Spark > Issue Type: Bug > Components: ML, SQL >Affects Versions: 2.4.1 >Reporter: Weichen Xu >Assignee: Weichen Xu >Priority: Major > > Spark image datasource fail when encounter some illegal images. Such as > exception following: > {code:java} > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in > stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 > (TID 132, 10.95.233.69, executor 0): java.lang.IllegalArgumentException: > Numbers of source Raster bands and source color space components do not match: > at java.awt.image.ColorConvertOp.filter(ColorConvertOp.java:482) at > com.sun.imageio.plugins.jpeg.JPEGImageReader.acceptPixels(JPEGImageReader.java:1280) > at com.sun.imageio.plugins.jpeg.JPEGImageReader.readImage(Native Method) at > com.sun.imageio.plugins.jpeg.JPEGImageReader.readInternal(JPEGImageReader.java:1247) > at > com.sun.imageio.plugins.jpeg.JPEGImageReader.read(JPEGImageReader.java:1050) > at javax.imageio.ImageIO.read(ImageIO.java:1448) at > javax.imageio.ImageIO.read(ImageIO.java:1352) at > org.apache.spark.ml.image.ImageSchema$.decode(ImageSchema.scala:136) at > org.apache.spark.ml.source.image.ImageFileFormat$$anonfun$buildReader$2.apply(ImageFileFormat.scala:84) > at > org.apache.spark.ml.source.image.ImageFileFormat$$anonfun$buildReader$2.apply(ImageFileFormat.scala:70) > at > org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:147) > at > org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:134) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:226) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:196) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:338) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:196) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown > Source) at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:638) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at > org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at > org.apache.spark.scheduler.Task.run(Task.scala:112) at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1481) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27454) Spark image datasource fail when encounter some illegal images
[ https://issues.apache.org/jira/browse/SPARK-27454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiangrui Meng resolved SPARK-27454. --- Resolution: Fixed Fix Version/s: 3.0.0 > Spark image datasource fail when encounter some illegal images > -- > > Key: SPARK-27454 > URL: https://issues.apache.org/jira/browse/SPARK-27454 > Project: Spark > Issue Type: Bug > Components: ML, SQL >Affects Versions: 2.4.1 >Reporter: Weichen Xu >Assignee: Weichen Xu >Priority: Major > Fix For: 3.0.0 > > > Spark image datasource fail when encounter some illegal images. Such as > exception following: > {code:java} > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in > stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 > (TID 132, 10.95.233.69, executor 0): java.lang.IllegalArgumentException: > Numbers of source Raster bands and source color space components do not match: > at java.awt.image.ColorConvertOp.filter(ColorConvertOp.java:482) at > com.sun.imageio.plugins.jpeg.JPEGImageReader.acceptPixels(JPEGImageReader.java:1280) > at com.sun.imageio.plugins.jpeg.JPEGImageReader.readImage(Native Method) at > com.sun.imageio.plugins.jpeg.JPEGImageReader.readInternal(JPEGImageReader.java:1247) > at > com.sun.imageio.plugins.jpeg.JPEGImageReader.read(JPEGImageReader.java:1050) > at javax.imageio.ImageIO.read(ImageIO.java:1448) at > javax.imageio.ImageIO.read(ImageIO.java:1352) at > org.apache.spark.ml.image.ImageSchema$.decode(ImageSchema.scala:136) at > org.apache.spark.ml.source.image.ImageFileFormat$$anonfun$buildReader$2.apply(ImageFileFormat.scala:84) > at > org.apache.spark.ml.source.image.ImageFileFormat$$anonfun$buildReader$2.apply(ImageFileFormat.scala:70) > at > org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:147) > at > org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:134) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:226) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:196) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:338) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:196) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown > Source) at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:638) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at > org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at > org.apache.spark.scheduler.Task.run(Task.scala:112) at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1481) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27469) Update Commons BeanUtils to 1.9.3
Sean Owen created SPARK-27469: - Summary: Update Commons BeanUtils to 1.9.3 Key: SPARK-27469 URL: https://issues.apache.org/jira/browse/SPARK-27469 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.1, 3.0.0 Reporter: Sean Owen Assignee: Sean Owen Right now, Spark inherits two incosistent versions of Commons BeanUtils via Hadoop: commons-beanutils 1.7.0 and commons-beanutils-core 1.8.0. Version 1.9.3 is the latest, and resolves bugs and a deserialization vulnerability that was otherwise resolved here in CVE-2017-12612. It'd be nice to both fix the inconsistency and get the latest to further ensure that there isn't any latent vulnerability here. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27468) "Storage Level" in "RDD Storage Page" is not correct
[ https://issues.apache.org/jira/browse/SPARK-27468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818197#comment-16818197 ] Xiao Li commented on SPARK-27468: - cc [~Gengliang.Wang] > "Storage Level" in "RDD Storage Page" is not correct > > > Key: SPARK-27468 > URL: https://issues.apache.org/jira/browse/SPARK-27468 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.1 >Reporter: Shixiong Zhu >Priority: Major > > I ran the following unit test and checked the UI. > {code} > val conf = new SparkConf() > .setAppName("test") > .setMaster("local-cluster[2,1,1024]") > .set("spark.ui.enabled", "true") > sc = new SparkContext(conf) > val rdd = sc.makeRDD(1 to 10, 1).persist(StorageLevel.MEMORY_ONLY_2) > rdd.count() > Thread.sleep(360) > {code} > The storage level is "Memory Deserialized 1x Replicated" in the RDD storage > page. > I tried to debug and found this is because Spark emitted the following two > events: > {code} > event: SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(1, > 10.8.132.160, 65473, None),rdd_0_0,StorageLevel(memory, deserialized, 2 > replicas),56,0)) > event: SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(0, > 10.8.132.160, 65474, None),rdd_0_0,StorageLevel(memory, deserialized, 1 > replicas),56,0)) > {code} > The storage level in the second event will overwrite the first one. "1 > replicas" comes from this line: > https://github.com/apache/spark/blob/3ab96d7acf870e53c9016b0b63d0b328eec23bed/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1457 > Maybe AppStatusListener should calculate the replicas from events? > Another fact we may need to think about is when replicas is 2, will two Spark > events arrive in the same order? Currently, two RPCs from different executors > can arrive in any order. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27468) "Storage Level" in "RDD Storage Page" is not correct
Shixiong Zhu created SPARK-27468: Summary: "Storage Level" in "RDD Storage Page" is not correct Key: SPARK-27468 URL: https://issues.apache.org/jira/browse/SPARK-27468 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.1 Reporter: Shixiong Zhu I ran the following unit test and checked the UI. {code} val conf = new SparkConf() .setAppName("test") .setMaster("local-cluster[2,1,1024]") .set("spark.ui.enabled", "true") sc = new SparkContext(conf) val rdd = sc.makeRDD(1 to 10, 1).persist(StorageLevel.MEMORY_ONLY_2) rdd.count() Thread.sleep(360) {code} The storage level is "Memory Deserialized 1x Replicated" in the RDD storage page. I tried to debug and found this is because Spark emitted the following two events: {code} event: SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(1, 10.8.132.160, 65473, None),rdd_0_0,StorageLevel(memory, deserialized, 2 replicas),56,0)) event: SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(0, 10.8.132.160, 65474, None),rdd_0_0,StorageLevel(memory, deserialized, 1 replicas),56,0)) {code} The storage level in the second event will overwrite the first one. "1 replicas" comes from this line: https://github.com/apache/spark/blob/3ab96d7acf870e53c9016b0b63d0b328eec23bed/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1457 Maybe AppStatusListener should calculate the replicas from events? Another fact we may need to think about is when replicas is 2, will two Spark events arrive in the same order? Currently, two RPCs from different executors can arrive in any order. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27458) Remind developer using IntelliJ to update maven version
[ https://issues.apache.org/jira/browse/SPARK-27458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-27458: -- Priority: Minor (was: Major) > Remind developer using IntelliJ to update maven version > --- > > Key: SPARK-27458 > URL: https://issues.apache.org/jira/browse/SPARK-27458 > Project: Spark > Issue Type: Improvement > Components: Documentation >Affects Versions: 3.0.0 >Reporter: William Wong >Priority: Minor > > I am using IntelliJ to update a few spark source. I tried to follow the guide > at '[http://spark.apache.org/developer-tools.html]' to setup an IntelliJ > project for Spark. However, the project was failed to build. It was due to > missing classes generated via antlr on sql/catalyst project. I tried to click > the button 'Generate Sources and Update Folders for all Projects' but it does > not help. Antlr task was not triggered as expected. > Checked the IntelliJ log file and found that it was because I did not set the > maven version properly and the 'Generate Sources and Update Folders for all > Projects' process was failed silently: > > _2019-04-14 16:05:24,796 [ 314609] INFO - #org.jetbrains.idea.maven - > [WARNING] Rule 0: org.apache.maven.plugins.enforcer.RequireMavenVersion > failed with message:_ > _Detected Maven Version: 3.3.9 is not in the allowed range 3.6.0._ > _2019-04-14 16:05:24,813 [ 314626] INFO - #org.jetbrains.idea.maven - > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute > goal org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M2:enforce > (enforce-versions) on project spark-parent_2.12: Some Enforcer rules have > failed. Look above for specific messages explaining why the rule failed._ > _java.lang.RuntimeException: > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute > goal org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M2:enforce > (enforce-versions) on project spark-parent_2.12: Some Enforcer rules have > failed. Look above for specific messages explaining why the rule failed._ > > Be honest, failing an action silently should be an IntelliJ bug. However, > enhancing the page '[http://spark.apache.org/developer-tools.html]' to > remind developers to check the maven version may save those new joiners some > time. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27467) Upgrade Maven to 3.6.1
[ https://issues.apache.org/jira/browse/SPARK-27467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-27467: -- Description: This issue aim to upgrade Maven to 3.6.1 to bring JDK9+ patches like MNG-6506. For the full release note, please see the following. https://maven.apache.org/docs/3.6.1/release-notes.html was: This issue aim to upgrade Maven to 3.6.1 to bring JDK9+ patches like MNG-6506. https://maven.apache.org/docs/3.6.1/release-notes.html > Upgrade Maven to 3.6.1 > -- > > Key: SPARK-27467 > URL: https://issues.apache.org/jira/browse/SPARK-27467 > Project: Spark > Issue Type: Sub-task > Components: Build >Affects Versions: 3.0.0 >Reporter: Dongjoon Hyun >Priority: Major > > This issue aim to upgrade Maven to 3.6.1 to bring JDK9+ patches like > MNG-6506. For the full release note, please see the following. > https://maven.apache.org/docs/3.6.1/release-notes.html -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27467) Upgrade Maven to 3.6.1
[ https://issues.apache.org/jira/browse/SPARK-27467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-27467: -- Issue Type: Sub-task (was: Improvement) Parent: SPARK-24417 > Upgrade Maven to 3.6.1 > -- > > Key: SPARK-27467 > URL: https://issues.apache.org/jira/browse/SPARK-27467 > Project: Spark > Issue Type: Sub-task > Components: Build >Affects Versions: 3.0.0 >Reporter: Dongjoon Hyun >Priority: Major > > This issue aim to upgrade Maven to 3.6.1 to bring JDK9+ patches like MNG-6506. > https://maven.apache.org/docs/3.6.1/release-notes.html -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27430) broadcast hint should be respected for broadcast nested loop join
[ https://issues.apache.org/jira/browse/SPARK-27430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan updated SPARK-27430: Summary: broadcast hint should be respected for broadcast nested loop join (was: BroadcastNestedLoopJoinExec can build any side whatever the join type is) > broadcast hint should be respected for broadcast nested loop join > - > > Key: SPARK-27430 > URL: https://issues.apache.org/jira/browse/SPARK-27430 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27467) Upgrade Maven to 3.6.1
Dongjoon Hyun created SPARK-27467: - Summary: Upgrade Maven to 3.6.1 Key: SPARK-27467 URL: https://issues.apache.org/jira/browse/SPARK-27467 Project: Spark Issue Type: Improvement Components: Build Affects Versions: 3.0.0 Reporter: Dongjoon Hyun This issue aim to upgrade Maven to 3.6.1 to bring JDK9+ patches like MNG-6506. https://maven.apache.org/docs/3.6.1/release-notes.html -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27062) CatalogImpl.refreshTable should register query in cache with received tableName
[ https://issues.apache.org/jira/browse/SPARK-27062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] William Wong resolved SPARK-27062. -- Resolution: Duplicate > CatalogImpl.refreshTable should register query in cache with received > tableName > --- > > Key: SPARK-27062 > URL: https://issues.apache.org/jira/browse/SPARK-27062 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.2 >Reporter: William Wong >Priority: Minor > Labels: easyfix, pull-request-available > Original Estimate: 2h > Remaining Estimate: 2h > > If _CatalogImpl.refreshTable()_ method is invoked against a cached table, > this method would first uncache corresponding query in the shared state cache > manager, and then cache it back to refresh the cache copy. > However, the table was recached with only 'table name'. The database name > will be missed. Therefore, if cached table is not on the default database, > the recreated cache may refer to a different table. For example, we may see > the cached table name in driver's storage page will be changed after table > refreshing. > Here is related code on github for your reference. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala] > > > {code:java} > override def refreshTable(tableName: String): Unit = { > val tableIdent = > sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) > val tableMetadata = > sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent) > val table = sparkSession.table(tableIdent) > if (tableMetadata.tableType == CatalogTableType.VIEW) { > // Temp or persistent views: refresh (or invalidate) any metadata/data > cached > // in the plan recursively. > table.queryExecution.analyzed.refresh() > } else { > // Non-temp tables: refresh the metadata cache. > sessionCatalog.refreshTable(tableIdent) > } > // If this table is cached as an InMemoryRelation, drop the original > // cached version and make the new version cached lazily. > if (isCached(table)) { > // Uncache the logicalPlan. > sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true, > blocking = true) > // Cache it again. > sparkSession.sharedState.cacheManager.cacheQuery(table, > Some(tableIdent.table)) > } > } > {code} > > CatalogImpl cache table with received _tableName_, instead of > _tableIdent.table_ > {code:java} > override def cacheTable(tableName: String): Unit = { > sparkSession.sharedState.cacheManager.cacheQuery(sparkSession.table(tableName), > Some(tableName)) } > {code} > > Therefore, I would like to propose aligning the behavior. RefreshTable method > should reuse the received _tableName_. Here is the proposed line of changes. > > {code:java} > sparkSession.sharedState.cacheManager.cacheQuery(table, > Some(tableIdent.table)) > {code} > to > {code:java} > sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableName)){code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27458) Remind developer using IntelliJ to update maven version
[ https://issues.apache.org/jira/browse/SPARK-27458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818100#comment-16818100 ] William Wong commented on SPARK-27458: -- PR ([https://github.com/apache/spark-website/pull/195]) was created. > Remind developer using IntelliJ to update maven version > --- > > Key: SPARK-27458 > URL: https://issues.apache.org/jira/browse/SPARK-27458 > Project: Spark > Issue Type: Improvement > Components: Documentation >Affects Versions: 3.0.0 >Reporter: William Wong >Priority: Major > > I am using IntelliJ to update a few spark source. I tried to follow the guide > at '[http://spark.apache.org/developer-tools.html]' to setup an IntelliJ > project for Spark. However, the project was failed to build. It was due to > missing classes generated via antlr on sql/catalyst project. I tried to click > the button 'Generate Sources and Update Folders for all Projects' but it does > not help. Antlr task was not triggered as expected. > Checked the IntelliJ log file and found that it was because I did not set the > maven version properly and the 'Generate Sources and Update Folders for all > Projects' process was failed silently: > > _2019-04-14 16:05:24,796 [ 314609] INFO - #org.jetbrains.idea.maven - > [WARNING] Rule 0: org.apache.maven.plugins.enforcer.RequireMavenVersion > failed with message:_ > _Detected Maven Version: 3.3.9 is not in the allowed range 3.6.0._ > _2019-04-14 16:05:24,813 [ 314626] INFO - #org.jetbrains.idea.maven - > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute > goal org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M2:enforce > (enforce-versions) on project spark-parent_2.12: Some Enforcer rules have > failed. Look above for specific messages explaining why the rule failed._ > _java.lang.RuntimeException: > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute > goal org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M2:enforce > (enforce-versions) on project spark-parent_2.12: Some Enforcer rules have > failed. Look above for specific messages explaining why the rule failed._ > > Be honest, failing an action silently should be an IntelliJ bug. However, > enhancing the page '[http://spark.apache.org/developer-tools.html]' to > remind developers to check the maven version may save those new joiners some > time. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27466) LEAD function with 'ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING' causes exception in Spark
[ https://issues.apache.org/jira/browse/SPARK-27466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zoltan updated SPARK-27466: --- Environment: Spark version 2.2.0.2.6.4.92-2 Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112) was: __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.0.2.6.4.92-2 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112) Type in expressions to have them evaluated. Description: *1. Create a table in Hive:* CREATE TABLE tab1( col1 varchar(1), col2 varchar(1) ) PARTITIONED BY ( col3 varchar(1) ) LOCATION 'hdfs://server1/data/tab1' *2. Query the Table in Spark:* *2.1: Simple query, no exception thrown:* scala> spark.sql("SELECT * from schema1.tab1").show() +-+---++ |col1|col2|col3| +-+---++ +-+---++ *2.2.: Query causing exception:* scala> spark.sql("*SELECT (LEAD(col1) OVER ( PARTITION BY col3 ORDER BY col1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING*)) from schema1.tab1") {color:#d04437}*org.apache.spark.sql.AnalysisException: Window Frame ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING must match the required frame ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING;*{color} at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$30$$anonfun$applyOrElse$11.applyOrElse(Analyzer.scala:2219) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$30$$anonfun$applyOrElse$11.applyOrElse(Analyzer.scala:2215) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:258) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:249) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$30.applyOrElse(Analyzer.scala:2215) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$30.applyOrElse(Analyzer.scala:2214) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at
[jira] [Created] (SPARK-27466) LEAD function with 'ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING' causes exception in Spark
Zoltan created SPARK-27466: -- Summary: LEAD function with 'ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING' causes exception in Spark Key: SPARK-27466 URL: https://issues.apache.org/jira/browse/SPARK-27466 Project: Spark Issue Type: Bug Components: Spark Shell Affects Versions: 2.2.0 Environment: __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.0.2.6.4.92-2 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112) Type in expressions to have them evaluated. Reporter: Zoltan *1. Create a table in Hive:* CREATE TABLE tab1( col1 varchar(1), col2 varchar(1) ) PARTITIONED BY ( col3 varchar(1) ) LOCATION 'hdfs://server1/data/tab1' *2. Query the Table in Spark:* Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.0.2.6.4.92-2 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112) Type in expressions to have them evaluated. Type :help for more information. *2.1: Simple query, no exception thrown:* scala> spark.sql("SELECT * from schema1.tab1").show() ++++ |col1|col2|col3| ++++ ++++ *2.2.: Query causing exception:* scala> spark.sql("*SELECT (LEAD(col1) OVER ( PARTITION BY col3 ORDER BY col1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING*)) from schema1.tab1") {color:#d04437}*org.apache.spark.sql.AnalysisException: Window Frame ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING must match the required frame ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING;*{color} at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$30$$anonfun$applyOrElse$11.applyOrElse(Analyzer.scala:2219) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$30$$anonfun$applyOrElse$11.applyOrElse(Analyzer.scala:2215) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:258) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:249) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$30.applyOrElse(Analyzer.scala:2215) at
[jira] [Comment Edited] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818070#comment-16818070 ] Prabhjot Singh Bharaj edited comment on SPARK-27409 at 4/15/19 3:30 PM: > "kafka.ssl.keystore.location", "non-existent" This was set to test the flow. > Why do you write "df = sc.sql.readStream..." (its just not working) I'm doing this is pyspark, where I get the spark context. Yes, I'm following the kafka integration guide linked. was (Author: pbharaj): > "kafka.ssl.keystore.location", "non-existent" This was set to test the flow. > Why do you write "df = sc.sql.readStream..." (its just not working) I'm doing this is pyspark, where I get the spark context. > Micro-batch support for Kafka Source in Spark 2.3 > - > > Key: SPARK-27409 > URL: https://issues.apache.org/jira/browse/SPARK-27409 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 2.3.2 >Reporter: Prabhjot Singh Bharaj >Priority: Major > > It seems with this change - > [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] > in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in > micro-batch mode but only in continuous mode. Is that understanding correct ? > {code:java} > E Py4JJavaError: An error occurred while calling o217.load. > E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:566) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:549) > E at > org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.(KafkaOffsetReader.scala:78) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > E at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > E at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > E at java.lang.reflect.Method.invoke(Method.java:498) > E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > E at py4j.Gateway.invoke(Gateway.java:282) > E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > E at py4j.commands.CallCommand.execute(CallCommand.java:79) > E at py4j.GatewayConnection.run(GatewayConnection.java:238) > E at java.lang.Thread.run(Thread.java:748) > E Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: > non-existent (No such file or directory) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44) > E at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93) > E at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51) > E at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657) > E ... 19 more > E Caused by: org.apache.kafka.common.KafkaException: > java.io.FileNotFoundException: non-existent (No such file or directory) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41) > E ... 23 more > E Caused by: java.io.FileNotFoundException: non-existent (No such file or > directory) > E at java.io.FileInputStream.open0(Native Method) > E at java.io.FileInputStream.open(FileInputStream.java:195) > E at java.io.FileInputStream.(FileInputStream.java:138) > E at java.io.FileInputStream.(FileInputStream.java:93) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201) > E at > org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137) > E at >
[jira] [Commented] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818070#comment-16818070 ] Prabhjot Singh Bharaj commented on SPARK-27409: --- > "kafka.ssl.keystore.location", "non-existent" This was set to test the flow. > Why do you write "df = sc.sql.readStream..." (its just not working) I'm doing this is pyspark, where I get the spark context. > Micro-batch support for Kafka Source in Spark 2.3 > - > > Key: SPARK-27409 > URL: https://issues.apache.org/jira/browse/SPARK-27409 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 2.3.2 >Reporter: Prabhjot Singh Bharaj >Priority: Major > > It seems with this change - > [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] > in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in > micro-batch mode but only in continuous mode. Is that understanding correct ? > {code:java} > E Py4JJavaError: An error occurred while calling o217.load. > E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:566) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:549) > E at > org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.(KafkaOffsetReader.scala:78) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > E at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > E at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > E at java.lang.reflect.Method.invoke(Method.java:498) > E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > E at py4j.Gateway.invoke(Gateway.java:282) > E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > E at py4j.commands.CallCommand.execute(CallCommand.java:79) > E at py4j.GatewayConnection.run(GatewayConnection.java:238) > E at java.lang.Thread.run(Thread.java:748) > E Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: > non-existent (No such file or directory) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44) > E at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93) > E at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51) > E at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657) > E ... 19 more > E Caused by: org.apache.kafka.common.KafkaException: > java.io.FileNotFoundException: non-existent (No such file or directory) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41) > E ... 23 more > E Caused by: java.io.FileNotFoundException: non-existent (No such file or > directory) > E at java.io.FileInputStream.open0(Native Method) > E at java.io.FileInputStream.open(FileInputStream.java:195) > E at java.io.FileInputStream.(FileInputStream.java:138) > E at java.io.FileInputStream.(FileInputStream.java:93) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201) > E at > org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119) > E ... 24 more{code} > When running a simple data stream loader for kafka without an SSL cert, it > goes through this code block - > > {code:java} > ... > ... > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at >
[jira] [Commented] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818056#comment-16818056 ] Gabor Somogyi commented on SPARK-27409: --- Are you really sure you've followed the following integration guide? https://spark.apache.org/docs/2.3.2/structured-streaming-kafka-integration.html > Micro-batch support for Kafka Source in Spark 2.3 > - > > Key: SPARK-27409 > URL: https://issues.apache.org/jira/browse/SPARK-27409 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 2.3.2 >Reporter: Prabhjot Singh Bharaj >Priority: Major > > It seems with this change - > [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] > in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in > micro-batch mode but only in continuous mode. Is that understanding correct ? > {code:java} > E Py4JJavaError: An error occurred while calling o217.load. > E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:566) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:549) > E at > org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.(KafkaOffsetReader.scala:78) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > E at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > E at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > E at java.lang.reflect.Method.invoke(Method.java:498) > E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > E at py4j.Gateway.invoke(Gateway.java:282) > E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > E at py4j.commands.CallCommand.execute(CallCommand.java:79) > E at py4j.GatewayConnection.run(GatewayConnection.java:238) > E at java.lang.Thread.run(Thread.java:748) > E Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: > non-existent (No such file or directory) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44) > E at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93) > E at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51) > E at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657) > E ... 19 more > E Caused by: org.apache.kafka.common.KafkaException: > java.io.FileNotFoundException: non-existent (No such file or directory) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41) > E ... 23 more > E Caused by: java.io.FileNotFoundException: non-existent (No such file or > directory) > E at java.io.FileInputStream.open0(Native Method) > E at java.io.FileInputStream.open(FileInputStream.java:195) > E at java.io.FileInputStream.(FileInputStream.java:138) > E at java.io.FileInputStream.(FileInputStream.java:93) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201) > E at > org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119) > E ... 24 more{code} > When running a simple data stream loader for kafka without an SSL cert, it > goes through this code block - > > {code:java} > ... > ... > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at >
[jira] [Created] (SPARK-27465) Kafka Client 0.11.0.0 is not Supporting the kafkatestutils package
Praveen created SPARK-27465: --- Summary: Kafka Client 0.11.0.0 is not Supporting the kafkatestutils package Key: SPARK-27465 URL: https://issues.apache.org/jira/browse/SPARK-27465 Project: Spark Issue Type: Bug Components: Java API Affects Versions: 2.3.3 Reporter: Praveen Hi Team, We are getting the below exceptions with Kafka Client Version 0.11.0.0 for KafkaTestUtils Package. But its working fine when we use the Kafka Client Version 0.10.0.1. Please suggest the way forwards. We are using the package " import org.apache.spark.streaming.kafka010.KafkaTestUtils;" ERROR: java.lang.NoSuchMethodError: kafka.server.KafkaServer$.$lessinit$greater$default$2()Lkafka/utils/Time; at org.apache.spark.streaming.kafka010.KafkaTestUtils$$anonfun$setupEmbeddedKafkaServer$2.apply(KafkaTestUtils.scala:110) at org.apache.spark.streaming.kafka010.KafkaTestUtils$$anonfun$setupEmbeddedKafkaServer$2.apply(KafkaTestUtils.scala:107) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:2234) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:2226) at org.apache.spark.streaming.kafka010.KafkaTestUtils.setupEmbeddedKafkaServer(KafkaTestUtils.scala:107) at org.apache.spark.streaming.kafka010.KafkaTestUtils.setup(KafkaTestUtils.scala:122) at com.netcracker.rms.smart.esp.ESPTestEnv.prepareKafkaTestUtils(ESPTestEnv.java:203) at com.netcracker.rms.smart.esp.ESPTestEnv.setUp(ESPTestEnv.java:157) at com.netcracker.rms.smart.esp.TestEventStreamProcessor.setUp(TestEventStreamProcessor.java:58) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25250) Race condition with tasks running when new attempt for same stage is created leads to other task in the next attempt running on the same partition id retry multiple ti
[ https://issues.apache.org/jira/browse/SPARK-25250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818053#comment-16818053 ] Thomas Graves commented on SPARK-25250: --- [~cloud_fan] can you please add details as to where and why this was reverted? This went into multiple branches 2.4.1 has already been released so I don't necessarily agree with a revert here. I would prefer to see another bug since its been released already. > Race condition with tasks running when new attempt for same stage is created > leads to other task in the next attempt running on the same partition id > retry multiple times > -- > > Key: SPARK-25250 > URL: https://issues.apache.org/jira/browse/SPARK-25250 > Project: Spark > Issue Type: Bug > Components: Scheduler, Spark Core >Affects Versions: 2.3.1 >Reporter: Parth Gandhi >Assignee: Parth Gandhi >Priority: Major > Fix For: 2.3.4, 2.4.1, 3.0.0 > > > We recently had a scenario where a race condition occurred when a task from > previous stage attempt just finished before new attempt for the same stage > was created due to fetch failure, so the new task created in the second > attempt on the same partition id was retrying multiple times due to > TaskCommitDenied Exception without realizing that the task in earlier attempt > was already successful. > For example, consider a task with partition id 9000 and index 9000 running in > stage 4.0. We see a fetch failure so thus, we spawn a new stage attempt 4.1. > Just within this timespan, the above task completes successfully, thus, > marking the partition id 9000 as complete for 4.0. However, as stage 4.1 has > not yet been created, the taskset info for that stage is not available to the > TaskScheduler so, naturally, the partition id 9000 has not been marked > completed for 4.1. Stage 4.1 now spawns task with index 2000 on the same > partition id 9000. This task fails due to CommitDeniedException and since, it > does not see the corresponding partition id as been marked successful, it > keeps retrying multiple times until the job finally succeeds. It doesn't > cause any job failures because the DAG scheduler is tracking the partitions > separate from the task set managers. > > Steps to Reproduce: > # Run any large job involving shuffle operation. > # When the ShuffleMap stage finishes and the ResultStage begins running, > cause this stage to throw a fetch failure exception(Try deleting certain > shuffle files on any host). > # Observe the task attempt numbers for the next stage attempt. Please note > that this issue is an intermittent one, so it might not happen all the time. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818051#comment-16818051 ] Gabor Somogyi commented on SPARK-27409: --- Don't know a couple of things in you code: * Why do you set "kafka.ssl.keystore.location", "non-existent"? * Why do you write "df = sc.sql.readStream..." (its just not working) {code:java} scala> sc.sql.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("kafka.security.protocol", "SSL").option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", "non-existent").option("subscribe", "no existing topic").load() :25: error: value sql is not a member of org.apache.spark.SparkContext sc.sql.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("kafka.security.protocol", "SSL").option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", "non-existent").option("subscribe", "no existing topic").load() ^ {code} Seems like its working when I've executed the following: {code:java} scala> spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("kafka.security.protocol", "SSL").option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", "non-existent").option("subscribe", "no existing topic").load() res0: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields] scala> res0.writeStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("checkpointLocation", "/tmp").start() res2: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@389f4282 scala> 2019-04-15 17:04:03 WARN KafkaOffsetReader:87 - Error in attempt 1 getting Kafka offsets: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:799) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:615) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:596) at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) at org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:85) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:199) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:197) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:288) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:287) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:287) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:286) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:197) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:197) at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:255) at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:196) at org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:195) at org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:190) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.getOrCreateInitialPartitionOffsets(KafkaMicroBatchReader.scala:190) {code} [~pbharaj] Have you tested it on master branch? > Micro-batch support for Kafka Source in Spark 2.3 > - > > Key: SPARK-27409 > URL: https://issues.apache.org/jira/browse/SPARK-27409 > Project: Spark > Issue Type: Question > Components: Structured Streaming >
[jira] [Commented] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support
[ https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817990#comment-16817990 ] Robert Joseph Evans commented on SPARK-27396: - There are actually a few public facing APIs I think we would need to expose corresponding to different use cases. The ML use case needs access to the data for easy transfer, but also needs to know when it has all of the data it is expecting. Typically this is done by converting the dataset into an {{RDD}} which can expose an iterator so you know you have all of the data. The simplest, and probably the most memory efficient solution is to expose the underlying {{RDD[ColumanrBatch]}}. We could do this with a new API on dataset similar to {{rdd()}} or {{javaRdd()}} perhaps something called {{columnarRdd()}} or {{columnarJavaRdd()}}, but I would need to think about the exact name a bit. This is the use cases that really drives me wanting to have a columnar shuffle implementation on the CPU because the typical use case is to load the data from a file, and then repartition the data for training, before doing the training. Many of the input formats (parquet, and orc) already are columnar, so we could support that entire use case without transforming the data from columns to rows. An RDD based API, however, is far from ideal for data processing, as we would lose all of the advantages of the DataSet for query optimization and rewriting. For that use case I think providing a columnar UDF API, based off of the pandas udf APIs. At a minimum I would say that we start off with a scalar columnar udf that would look like a regular udf, but the input instead of being simple types would be {{ColumnVector}} and so would the output. But because a ColumnVector is not typed itself we could not infer the output format from the API, so we would need them to declare it for us, something kind of like. {code:java} def columnar_udf((a) -> /* do something with a*/, BooleanType) {code} In the future we could add in support for the more complex forms if they prove popular like grouped map and grouped aggregate udfs. These should be fairly simple to do, but I don't really want to commit to them in the first version of the code, but we would want the API written with that in mind. The final API is to allow advanced framework users to add or replace the columnar processing of a physical plan. This would allow someone to add in say GPU support as the backend for data processing, or SIMD optimized CPU, or any other type of accelerated columnar processing. This would be done through the experimental and unstable `spark.sql.extensions` config and the SparkSessionsExtensions API. It would primarily provide some places to insert a `Rule[SparkPlan]` into the execution phase of Catalyst so framework writers could use the APIs I described previously to implement columnar versions of the various operators. This would also allow the operators to implement columnar specific logical plan optimizations using the same API. > SPIP: Public APIs for extended Columnar Processing Support > -- > > Key: SPARK-27396 > URL: https://issues.apache.org/jira/browse/SPARK-27396 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Robert Joseph Evans >Priority: Major > > *Q1.* What are you trying to do? Articulate your objectives using absolutely > no jargon. > > The Dataset/DataFrame API in Spark currently only exposes to users one row at > a time when processing data. The goals of this are to > > # Expose to end users a new option of processing the data in a columnar > format, multiple rows at a time, with the data organized into contiguous > arrays in memory. > # Make any transitions between the columnar memory layout and a row based > layout transparent to the end user. > # Allow for simple data exchange with other systems, DL/ML libraries, > pandas, etc. by having clean APIs to transform the columnar data into an > Apache Arrow compatible layout. > # Provide a plugin mechanism for columnar processing support so an advanced > user could avoid data transition between columnar and row based processing > even through shuffles. This means we should at least support pluggable APIs > so an advanced end user can implement the columnar partitioning themselves, > and provide the glue necessary to shuffle the data still in a columnar format. > # Expose new APIs that allow advanced users or frameworks to implement > columnar processing either as UDFs, or by adjusting the physical plan to do > columnar processing. If the latter is too controversial we can move it to > another SPIP, but we plan to implement some accelerated computing in parallel > with this feature to be sure
[jira] [Assigned] (SPARK-27459) Revise the exception message of schema inference failure in file source V2
[ https://issues.apache.org/jira/browse/SPARK-27459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-27459: --- Assignee: Gengliang Wang > Revise the exception message of schema inference failure in file source V2 > -- > > Key: SPARK-27459 > URL: https://issues.apache.org/jira/browse/SPARK-27459 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Assignee: Gengliang Wang >Priority: Trivial > > Since > https://github.com/apache/spark/pull/23383/files#diff-db4a140579c1ac4b1dbec7fe5057eecaR36, > the exception message of schema inference failure in file source V2 is > `tableName`, which is equivalent to `shortName + path`. > While in file source V1, the message is `Unable to infer schema from > ORC/CSV/JSON...`. > We should make the message in V2 consistent with V1, so that in the future > migration the related test cases don't need to be midified. > https://github.com/apache/spark/pull/24058#pullrequestreview-226364350 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27459) Revise the exception message of schema inference failure in file source V2
[ https://issues.apache.org/jira/browse/SPARK-27459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-27459. - Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 24369 [https://github.com/apache/spark/pull/24369] > Revise the exception message of schema inference failure in file source V2 > -- > > Key: SPARK-27459 > URL: https://issues.apache.org/jira/browse/SPARK-27459 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Assignee: Gengliang Wang >Priority: Trivial > Fix For: 3.0.0 > > > Since > https://github.com/apache/spark/pull/23383/files#diff-db4a140579c1ac4b1dbec7fe5057eecaR36, > the exception message of schema inference failure in file source V2 is > `tableName`, which is equivalent to `shortName + path`. > While in file source V1, the message is `Unable to infer schema from > ORC/CSV/JSON...`. > We should make the message in V2 consistent with V1, so that in the future > migration the related test cases don't need to be midified. > https://github.com/apache/spark/pull/24058#pullrequestreview-226364350 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27330) ForeachWriter is not being closed once a batch is aborted
[ https://issues.apache.org/jira/browse/SPARK-27330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817926#comment-16817926 ] Gabor Somogyi commented on SPARK-27330: --- Cool, ping me if you need review... > ForeachWriter is not being closed once a batch is aborted > - > > Key: SPARK-27330 > URL: https://issues.apache.org/jira/browse/SPARK-27330 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Eyal Zituny >Priority: Major > > in cases where a micro batch is being killed (interrupted), not during actual > processing done by the {{ForeachDataWriter}} (when iterating the iterator), > {{DataWritingSparkTask}} will handle the interruption and call > {{dataWriter.abort()}} > the problem is that {{ForeachDataWriter}} has an empty implementation for the > abort method. > due to that, I have tasks which uses the foreach writer and according to the > [documentation|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach] > they are opening connections in the "open" method and closing the > connections on the "close" method but since the "close" is never called, the > connections are never closed > this wasn't the behavior pre spark 2.4 > my suggestion is to call {{ForeachWriter.close()}} when > {{DataWriter.abort()}} is called, and exception should also be provided in > order to notify the foreach writer that this task has failed > > {code} > stack trace from the exception i have encountered: > org.apache.spark.TaskKilledException: null > at > org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:149) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146) > at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67) > at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27330) ForeachWriter is not being closed once a batch is aborted
[ https://issues.apache.org/jira/browse/SPARK-27330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817918#comment-16817918 ] Eyal Zituny commented on SPARK-27330: - [~gsomogyi] yes, almost done with it > ForeachWriter is not being closed once a batch is aborted > - > > Key: SPARK-27330 > URL: https://issues.apache.org/jira/browse/SPARK-27330 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Eyal Zituny >Priority: Major > > in cases where a micro batch is being killed (interrupted), not during actual > processing done by the {{ForeachDataWriter}} (when iterating the iterator), > {{DataWritingSparkTask}} will handle the interruption and call > {{dataWriter.abort()}} > the problem is that {{ForeachDataWriter}} has an empty implementation for the > abort method. > due to that, I have tasks which uses the foreach writer and according to the > [documentation|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach] > they are opening connections in the "open" method and closing the > connections on the "close" method but since the "close" is never called, the > connections are never closed > this wasn't the behavior pre spark 2.4 > my suggestion is to call {{ForeachWriter.close()}} when > {{DataWriter.abort()}} is called, and exception should also be provided in > order to notify the foreach writer that this task has failed > > {code} > stack trace from the exception i have encountered: > org.apache.spark.TaskKilledException: null > at > org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:149) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146) > at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67) > at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27330) ForeachWriter is not being closed once a batch is aborted
[ https://issues.apache.org/jira/browse/SPARK-27330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817902#comment-16817902 ] Gabor Somogyi commented on SPARK-27330: --- [~eyalzit] are you working on this? Happy to file a PR if don't have time. > ForeachWriter is not being closed once a batch is aborted > - > > Key: SPARK-27330 > URL: https://issues.apache.org/jira/browse/SPARK-27330 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Eyal Zituny >Priority: Major > > in cases where a micro batch is being killed (interrupted), not during actual > processing done by the {{ForeachDataWriter}} (when iterating the iterator), > {{DataWritingSparkTask}} will handle the interruption and call > {{dataWriter.abort()}} > the problem is that {{ForeachDataWriter}} has an empty implementation for the > abort method. > due to that, I have tasks which uses the foreach writer and according to the > [documentation|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach] > they are opening connections in the "open" method and closing the > connections on the "close" method but since the "close" is never called, the > connections are never closed > this wasn't the behavior pre spark 2.4 > my suggestion is to call {{ForeachWriter.close()}} when > {{DataWriter.abort()}} is called, and exception should also be provided in > order to notify the foreach writer that this task has failed > > {code} > stack trace from the exception i have encountered: > org.apache.spark.TaskKilledException: null > at > org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:149) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146) > at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67) > at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27464) Add Constant instead of referring string literal used from many places
Shivu Sondur created SPARK-27464: Summary: Add Constant instead of referring string literal used from many places Key: SPARK-27464 URL: https://issues.apache.org/jira/browse/SPARK-27464 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.1 Reporter: Shivu Sondur Add Constant instead of referring string literal used from many places for "spark.buffer.pageSize" -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27463) SPIP: Support Dataframe Cogroup via Pandas UDFs
[ https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Martin updated SPARK-27463: - Description: h2. *Background and Motivation* Recently there has been a great deal of work in PySpark to improve interoperability with the Pandas library. This work has allowed users to write User Defined Functions (UDFs) in Pandas which can then be applied to a Spark DataFrame. The benefit here is that it allows users to combine the functionality of Pandas with the parallelisation abilities of Spark. In addition, these new Pandas UDFs have significantly lower overhead than traditional UDFS as they operate on a batch of data at a time (i.e. they are vectorised) and they use Apache Arrow for serialisation between the JVM and Python processes. As of Spark 2.3 two types of Pandas UDF are offered. Scalar UDFs effectively offer a map operation at the row level, while Grouped Map UDFs allow a map operation on a group of data. This functionality has proved successful in allowing users to integrate Spark with existing Pandas workflows, however there are situations where the existing functionality offered is not sufficient. One such case is analogous to the existing Cogroup functionality available on RDDs and DataSets and was proposed by Li Jin on the Spark-Dev mailing list[1] . In this case, the user would like to group two Spark DataFrames by a common key and then apply a python function to each group. This python function would take two pandas DataFrames as its arguments and would return an arbitrary length Pandas DataFrame. To give a concrete example of the usefulness of this functionality, consider the use case of performing an as-of join between two distinct DataFrames This is something that has traditionally been very difficult to do in Spark (and indeed in SQL in general)[2] but which has good support in Pandas[3]. If Cogroup-like functionality was available in PySpark then one could simply write a Pandas function to perform the as-of joining which could then be applied to two (appropriately grouped) DataFrames. This proposal therefore advocates introducing a new API call which would allow for a Cogrouped Pandas UDF. [1][http://mail-archives.apache.org/mod_mbox/spark-dev/201902.mbox/%3ccagy9duxt569bpgp0wsc2esjgcoo5+hbfihfbkofcocclmjh...@mail.gmail.com%3e] [2]see https://issues.apache.org/jira/browse/SPARK-22947 for a SPIP that aims to add asof join functionality to Spark. [3][https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge_asof.html] h2. *API Changes* The public API changes would all be on the PySpark side. In terms of the API itself there are a couple of options depending on whether the goal is syntactic brevity or with consistency with the DataSet version of cogroup. If brevity is the aim then a new method can be added to the DataFrame class: {code:java} # other is another DataFrame, on is the cogroup key, udf is the function to apply. def cogroup(self, other, on, udf){code} Alternatively, to be consistent with the DataSet version of cogroup, a new method could be added to the GroupedData class. {code:java} # other is another GroupedData, udf is the function to apply. def cogroup(self, other, udf){code} The exact API can be worked out as part of this SPIP and the document will be updated once a decision has been reached. In addition, a new PandasUDFType, COGROUPED_MAP, will be defined to identify this new type of UDF. Functions annotated with this decorator should take two Pandas DataFrames and return a single Pandas DataFrame. Here is an example of usage, using the as-of join use case described earlier and the first option for the API syntax. {code:java} @pandas_udf(return_schema, PandasUDFType.COGROUPED_MAP) # df1, df2 and function return are all pandas.DataFrames def asof_join(df1, df2): return pd.merge_asof(df1, df2, on='time') df1.cogroup(df2, on='product_id', apply=asof_join){code} h2. *Target Personas* Data scientists, data engineers, library developers. h2. *Scope* * Initial implementation will only consider the case of Cogrouping exactly two DataFrames. Further work may extend this to the case of multiple DataFrames * API call is to be made available via PySpark only. No equivalent R/Java/Scala functionality will be offered. h2. *Design* * New UDF type, PandasUDFType.COGROUPED_MAP, to be defined in PySpark * New public method to be added to either GroupedData or DataFrame to expose cogroup in Pyspark * New package private method to be added to RelationGroupedDataset to allow cogroup in Scala * New logical node to be added representing cogroup. * New physical node to be added to implement cogroup. This node will ensure correct partitioning of input DataFrames and create two groupedIterators which will be piped into the Python process for UDF execution. * Extend ArrowPythonRunner such that it can
[jira] [Created] (SPARK-27463) SPIP: Support Dataframe Cogroup via Pandas UDFs
Chris Martin created SPARK-27463: Summary: SPIP: Support Dataframe Cogroup via Pandas UDFs Key: SPARK-27463 URL: https://issues.apache.org/jira/browse/SPARK-27463 Project: Spark Issue Type: Improvement Components: PySpark, SQL Affects Versions: 3.0.0 Reporter: Chris Martin h2. *Background and Motivation* Recently there has been a great deal of work in PySpark to improve interoperability with the Pandas library. This work has allowed users to write User Defined Functions (UDFs) in Pandas which can then be applied to a Spark DataFrame. The benefit here is that it allows users to combine the functionality of Pandas with the parallelisation abilities of Spark. In addition, these new Pandas UDFs have significantly lower overhead than traditional UDFS as they operate on a batch of data at a time (i.e. they are vectorised) and they use Apache Arrow for serialisation between the JVM and Python processes. As of Spark 2.3 two types of Pandas UDF are offered. Scalar UDFs effectively offer a map operation at the row level, while Grouped Map UDFs allow a map operation on a group of data. This functionality has proved successful in allowing users to integrate Spark with existing Pandas workflows, however there are situations where the existing functionality offered is not sufficient. One such case is analogous to the existing Cogroup functionality available on RDDs and DataSets and was proposed by Li Jin on the Spark-Dev mailing list[1] . In this case, the user would like to group two Spark DataFrames by a common key and then apply a python function to each group. This python function would take two pandas DataFrames as its arguments and would return an arbitrary length Pandas DataFrame. To give a concrete example of the usefulness of this functionality, consider the use case of performing an as-of join between two distinct DataFrames This is something that has traditionally been very difficult to do in Spark (and indeed in SQL in general)[2] but which has good support in Pandas[3]. If Cogroup-like functionality was available in PySpark then one could simply write a Pandas function to perform the as-of joining which could then be applied to two (appropriately grouped) DataFrames. This proposal therefore advocates introducing a new API call which would allow for a Cogrouped Pandas UDF. [1][http://mail-archives.apache.org/mod_mbox/spark-dev/201902.mbox/%3ccagy9duxt569bpgp0wsc2esjgcoo5+hbfihfbkofcocclmjh...@mail.gmail.com%3e] [2]see https://issues.apache.org/jira/browse/SPARK-22947 for a SPIP that aims to add asof join functionality to Spark. [3][https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge_asof.html] h2. *API Changes* The public API changes would all be on the PySpark side. In terms of the API itself there are a couple of options depending on whether the goal is syntactic brevity or with consistency with the DataSet version of cogroup. If brevity is the aim then a new method can be added to the DataFrame class: {code:java} # other is another DataFrame, on is the cogroup key, udf is the function to apply. def cogroup(self, other, on, udf){code} Alternatively, to be consistent with the DataSet version of cogroup, a new method could be added to the GroupedData class. {code:java} # other is another GroupedData, udf is the function to apply. def cogroup(self, other, udf){code} The exact API can be worked out as part of this SPIP and the document will be updated once a decision has been reached. In addition, a new PandasUDFType, COGROUPED_MAP, will be defined to identify this new type of UDF. Functions annotated with this decorator should take two Pandas DataFrames and return a single Pandas DataFrame. Here is an example of usage, using the as-of join use case described earlier and the first option for the API syntax. {code:java} @pandas_udf(return_schema, PandasUDFType.COGROUPED_MAP) # df1, df2 and function return are all pandas.DataFrames def asof_join(df1, df2): return pd.merge_asof(df1, df2, on='time') df1.cogroup(df2, on='product_id', apply=asof_join){code} h2. *Target Personas* Data scientists, data engineers, library developers. h2. *Scope* * Initial implementation will only consider the case of Cogrouping exactly two DataFrames. Further work may extend this to the case of multiple DataFrames * API call is to be made available via PySpark only. No equivalent R/Java/Scala functionality will be offered. h2. *Design* * New UDF type, PandasUDFType.COGROUPED_MAP, to be defined in PySpark * New public method to be added to either GroupedData or DataFrame to expose cogroup in Pyspark * New package private method to be added to RelationGroupedDataset to allow cogroup in Scala * New logical node to be added representing cogroup. * New physical node to be added to implement cogroup. This node
[jira] [Created] (SPARK-27462) Spark hive can not choose some columns in target table flexibly, when running insert into.
jiaan.geng created SPARK-27462: -- Summary: Spark hive can not choose some columns in target table flexibly, when running insert into. Key: SPARK-27462 URL: https://issues.apache.org/jira/browse/SPARK-27462 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.4.0, 2.3.0 Reporter: jiaan.geng Spark SQL can not support the feature to choose some columns in target table flexibly, when running {code:java} insert into tableA select ... from tableB;{code} This feature is supported by Hive, so I think this grammar should be consistent with Hive。 Hive support some feature about 'insert into' as follows: {code:java} insert into gja_test_spark select * from gja_test; insert into gja_test_spark(key, value, other) select key, value, other from gja_test; insert into gja_test_spark(key, value) select value, other from gja_test; insert into gja_test_spark(key, other) select value, other from gja_test; insert into gja_test_spark(value, other) select value, other from gja_test;{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27461) Not throwing error for Datatype mismatch
[ https://issues.apache.org/jira/browse/SPARK-27461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mahasubramanian Maharajan updated SPARK-27461: -- Priority: Critical (was: Major) > Not throwing error for Datatype mismatch > > > Key: SPARK-27461 > URL: https://issues.apache.org/jira/browse/SPARK-27461 > Project: Spark > Issue Type: Question > Components: SQL >Affects Versions: 2.4.0 >Reporter: Mahasubramanian Maharajan >Priority: Critical > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27461) Not throwing error for Datatype mismatch
Mahasubramanian Maharajan created SPARK-27461: - Summary: Not throwing error for Datatype mismatch Key: SPARK-27461 URL: https://issues.apache.org/jira/browse/SPARK-27461 Project: Spark Issue Type: Question Components: SQL Affects Versions: 2.4.0 Reporter: Mahasubramanian Maharajan -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27173) For hive parquet table,codes(lz4,brotli,zstd) are not available
[ https://issues.apache.org/jira/browse/SPARK-27173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liuxian resolved SPARK-27173. - Resolution: Won't Fix > For hive parquet table,codes(lz4,brotli,zstd) are not available > --- > > Key: SPARK-27173 > URL: https://issues.apache.org/jira/browse/SPARK-27173 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: liuxian >Priority: Minor > > From > _parquet.hadoop.metadata.CompressionCodecName_(parquet-hadoop-bundle-1.6.0.jar > ), we can know, for hive parquet table, it only supports *snappy*, *gzip* > and *lzo* -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org