[jira] [Commented] (SPARK-19903) Watermark metadata is lost when using resolved attributes
[ https://issues.apache.org/jira/browse/SPARK-19903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610931#comment-16610931 ] Shixiong Zhu commented on SPARK-19903: -- Yes. I removed the target version. > Watermark metadata is lost when using resolved attributes > - > > Key: SPARK-19903 > URL: https://issues.apache.org/jira/browse/SPARK-19903 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 > Environment: Ubuntu Linux >Reporter: Piotr Nestorow >Priority: Major > > PySpark example reads a Kafka stream. There is watermarking set when handling > the data window. The defined query uses output Append mode. > The PySpark engine reports the error: > 'Append output mode not supported when there are streaming aggregations on > streaming DataFrames/DataSets' > The Python example: > --- > {code} > import sys > from pyspark.sql import SparkSession > from pyspark.sql.functions import explode, split, window > if __name__ == "__main__": > if len(sys.argv) != 4: > print(""" > Usage: structured_kafka_wordcount.py > > """, file=sys.stderr) > exit(-1) > bootstrapServers = sys.argv[1] > subscribeType = sys.argv[2] > topics = sys.argv[3] > spark = SparkSession\ > .builder\ > .appName("StructuredKafkaWordCount")\ > .getOrCreate() > # Create DataSet representing the stream of input lines from kafka > lines = spark\ > .readStream\ > .format("kafka")\ > .option("kafka.bootstrap.servers", bootstrapServers)\ > .option(subscribeType, topics)\ > .load()\ > .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") > # Split the lines into words, retaining timestamps > # split() splits each line into an array, and explode() turns the array > into multiple rows > words = lines.select( > explode(split(lines.value, ' ')).alias('word'), > lines.timestamp > ) > # Group the data by window and word and compute the count of each group > windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy( > window(words.timestamp, "30 seconds", "30 seconds"), words.word > ).count() > # Start running the query that prints the running counts to the console > query = windowedCounts\ > .writeStream\ > .outputMode('append')\ > .format('console')\ > .option("truncate", "false")\ > .start() > query.awaitTermination() > {code} > The corresponding example in Zeppelin notebook: > {code} > %spark.pyspark > from pyspark.sql.functions import explode, split, window > # Create DataSet representing the stream of input lines from kafka > lines = spark\ > .readStream\ > .format("kafka")\ > .option("kafka.bootstrap.servers", "localhost:9092")\ > .option("subscribe", "words")\ > .load()\ > .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") > # Split the lines into words, retaining timestamps > # split() splits each line into an array, and explode() turns the array into > multiple rows > words = lines.select( > explode(split(lines.value, ' ')).alias('word'), > lines.timestamp > ) > # Group the data by window and word and compute the count of each group > windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy( > window(words.timestamp, "30 seconds", "30 seconds"), words.word > ).count() > # Start running the query that prints the running counts to the console > query = windowedCounts\ > .writeStream\ > .outputMode('append')\ > .format('console')\ > .option("truncate", "false")\ > .start() > query.awaitTermination() > -- > Note that the Scala version of the same example in Zeppelin notebook works > fine: > > import java.sql.Timestamp > import org.apache.spark.sql.streaming.ProcessingTime > import org.apache.spark.sql.functions._ > // Create DataSet representing the stream of input lines from kafka > val lines = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("subscribe", "words") > .load() > // Split the lines into words, retaining timestamps > val words = lines > .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS > TIMESTAMP)") > .as[(String, Timestamp)] > .flatMap(line => line._1.split(" ").map(word => (word, line._2))) > .toDF("word",
[jira] [Commented] (SPARK-23425) load data for hdfs file path with wild card usage is not working properly
[ https://issues.apache.org/jira/browse/SPARK-23425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609595#comment-16609595 ] Shixiong Zhu commented on SPARK-23425: -- Added "release-note" label. Previously, when INPATH contains special characters (such as " "), the user has to manually escape them, e.g., use "/a/b/foo%20bar" rather than "/a/b/foo bar" because the former will throw "URISyntaxException: Illegal character in path at index XX: /a/b/foo bar". After this patch, the above workaround will throw "AnalysisException: LOAD DATA input path does not exist: /a/b/foo%20bar;". The root cause is we changed from "new URI(user_specified_path)" to "new Path(user_specified_path)". I believe this patch is indeed a bug fix but it's worth to highlight in the release note. > load data for hdfs file path with wild card usage is not working properly > - > > Key: SPARK-23425 > URL: https://issues.apache.org/jira/browse/SPARK-23425 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1, 2.3.0 >Reporter: Sujith >Assignee: Sujith >Priority: Major > Labels: release-notes > Fix For: 2.4.0 > > Attachments: wildcard_issue.PNG > > > load data command for loading data from non local file paths by using wild > card strings lke * are not working > eg: > "load data inpath 'hdfs://hacluster/user/ext* into table t1" > Getting Analysis excepton while executing this query > !image-2018-02-14-23-41-39-923.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23425) load data for hdfs file path with wild card usage is not working properly
[ https://issues.apache.org/jira/browse/SPARK-23425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-23425: - Labels: release-notes (was: release) > load data for hdfs file path with wild card usage is not working properly > - > > Key: SPARK-23425 > URL: https://issues.apache.org/jira/browse/SPARK-23425 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1, 2.3.0 >Reporter: Sujith >Assignee: Sujith >Priority: Major > Labels: release-notes > Fix For: 2.4.0 > > Attachments: wildcard_issue.PNG > > > load data command for loading data from non local file paths by using wild > card strings lke * are not working > eg: > "load data inpath 'hdfs://hacluster/user/ext* into table t1" > Getting Analysis excepton while executing this query > !image-2018-02-14-23-41-39-923.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23425) load data for hdfs file path with wild card usage is not working properly
[ https://issues.apache.org/jira/browse/SPARK-23425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-23425: - Labels: release (was: ) > load data for hdfs file path with wild card usage is not working properly > - > > Key: SPARK-23425 > URL: https://issues.apache.org/jira/browse/SPARK-23425 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1, 2.3.0 >Reporter: Sujith >Assignee: Sujith >Priority: Major > Labels: release-notes > Fix For: 2.4.0 > > Attachments: wildcard_issue.PNG > > > load data command for loading data from non local file paths by using wild > card strings lke * are not working > eg: > "load data inpath 'hdfs://hacluster/user/ext* into table t1" > Getting Analysis excepton while executing this query > !image-2018-02-14-23-41-39-923.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-24748) Support for reporting custom metrics via Streaming Query Progress
[ https://issues.apache.org/jira/browse/SPARK-24748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reopened SPARK-24748: -- Assignee: (was: Arun Mahadevan) > Support for reporting custom metrics via Streaming Query Progress > - > > Key: SPARK-24748 > URL: https://issues.apache.org/jira/browse/SPARK-24748 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.1 >Reporter: Arun Mahadevan >Priority: Major > > Currently the Structured Streaming sources and sinks does not have a way to > report custom metrics. Providing an option to report custom metrics and > making it available via Streaming Query progress can enable sources and sinks > to report custom progress information (E.g. the lag metrics for Kafka source). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-24863) Report offset lag as a custom metrics for Kafka structured streaming source
[ https://issues.apache.org/jira/browse/SPARK-24863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reopened SPARK-24863: -- Assignee: (was: Arun Mahadevan) > Report offset lag as a custom metrics for Kafka structured streaming source > --- > > Key: SPARK-24863 > URL: https://issues.apache.org/jira/browse/SPARK-24863 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Arun Mahadevan >Priority: Major > > We can build on top of SPARK-24748 to report offset lag as a custom metrics > for Kafka structured streaming source. > This is the difference between the latest offsets in Kafka the time the > metrics is reported (just after a micro-batch completes) and the latest > offset Spark has processed. It can be 0 (or close to 0) if spark keeps up > with the rate at which messages are ingested into Kafka topics in steady > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24863) Report offset lag as a custom metrics for Kafka structured streaming source
[ https://issues.apache.org/jira/browse/SPARK-24863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-24863: - Fix Version/s: (was: 2.4.0) > Report offset lag as a custom metrics for Kafka structured streaming source > --- > > Key: SPARK-24863 > URL: https://issues.apache.org/jira/browse/SPARK-24863 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Arun Mahadevan >Priority: Major > > We can build on top of SPARK-24748 to report offset lag as a custom metrics > for Kafka structured streaming source. > This is the difference between the latest offsets in Kafka the time the > metrics is reported (just after a micro-batch completes) and the latest > offset Spark has processed. It can be 0 (or close to 0) if spark keeps up > with the rate at which messages are ingested into Kafka topics in steady > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24748) Support for reporting custom metrics via Streaming Query Progress
[ https://issues.apache.org/jira/browse/SPARK-24748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-24748: - Fix Version/s: (was: 2.4.0) > Support for reporting custom metrics via Streaming Query Progress > - > > Key: SPARK-24748 > URL: https://issues.apache.org/jira/browse/SPARK-24748 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.1 >Reporter: Arun Mahadevan >Priority: Major > > Currently the Structured Streaming sources and sinks does not have a way to > report custom metrics. Providing an option to report custom metrics and > making it available via Streaming Query progress can enable sources and sinks > to report custom progress information (E.g. the lag metrics for Kafka source). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25336) Revert SPARK-24863 and SPARK-24748
[ https://issues.apache.org/jira/browse/SPARK-25336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25336: - Summary: Revert SPARK-24863 and SPARK-24748 (was: Revert SPARK-24863 and SPARK 24748) > Revert SPARK-24863 and SPARK-24748 > -- > > Key: SPARK-25336 > URL: https://issues.apache.org/jira/browse/SPARK-25336 > Project: Spark > Issue Type: Task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Priority: Major > > Revert SPARK-24863 and SPARK 24748. We will revisit them when the data source > v2 APIs are out. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25336) Revert SPARK-24863 and SPARK-24748
[ https://issues.apache.org/jira/browse/SPARK-25336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25336: - Description: Revert SPARK-24863 and SPARK-24748. We will revisit them when the data source v2 APIs are out. (was: Revert SPARK-24863 and SPARK 24748. We will revisit them when the data source v2 APIs are out.) > Revert SPARK-24863 and SPARK-24748 > -- > > Key: SPARK-25336 > URL: https://issues.apache.org/jira/browse/SPARK-25336 > Project: Spark > Issue Type: Task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Priority: Major > > Revert SPARK-24863 and SPARK-24748. We will revisit them when the data source > v2 APIs are out. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-25336) Revert SPARK-24863 and SPARK-24748
[ https://issues.apache.org/jira/browse/SPARK-25336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reassigned SPARK-25336: Assignee: Shixiong Zhu > Revert SPARK-24863 and SPARK-24748 > -- > > Key: SPARK-25336 > URL: https://issues.apache.org/jira/browse/SPARK-25336 > Project: Spark > Issue Type: Task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > > Revert SPARK-24863 and SPARK-24748. We will revisit them when the data source > v2 APIs are out. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25336) Revert SPARK-24863 and SPARK 24748
Shixiong Zhu created SPARK-25336: Summary: Revert SPARK-24863 and SPARK 24748 Key: SPARK-25336 URL: https://issues.apache.org/jira/browse/SPARK-25336 Project: Spark Issue Type: Task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Shixiong Zhu Revert SPARK-24863 and SPARK 24748. We will revisit them when the data source v2 APIs are out. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25257) v2 MicroBatchReaders can't resume from checkpoints
[ https://issues.apache.org/jira/browse/SPARK-25257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16599279#comment-16599279 ] Shixiong Zhu commented on SPARK-25257: -- [~mojodna] This issue has been fixed in SPARK-23092. > v2 MicroBatchReaders can't resume from checkpoints > -- > > Key: SPARK-25257 > URL: https://issues.apache.org/jira/browse/SPARK-25257 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.1 >Reporter: Seth Fitzsimmons >Priority: Major > Attachments: deserialize.patch > > > When resuming from a checkpoint: > {code:java} > writeStream.option("checkpointLocation", > "/tmp/checkpoint").format("console").start > {code} > The stream reader fails with: > {noformat} > osmesa.common.streaming.AugmentedDiffMicroBatchReader@59e19287 > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) > Caused by: java.lang.ClassCastException: > org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to > org.apache.spark.sql.sources.v2.reader.streaming.Offset > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at > org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) > ... 1 more > {noformat} > The root cause appears to be that the {{SerializedOffset}} (JSON, from disk) > is never deserialized; I would expect to see something along the lines of > {{reader.deserializeOffset(off.json)}} here (unless {{available}} is intended > to be deserialized elsewhere):
[jira] [Resolved] (SPARK-25257) v2 MicroBatchReaders can't resume from checkpoints
[ https://issues.apache.org/jira/browse/SPARK-25257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-25257. -- Resolution: Duplicate > v2 MicroBatchReaders can't resume from checkpoints > -- > > Key: SPARK-25257 > URL: https://issues.apache.org/jira/browse/SPARK-25257 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.1 >Reporter: Seth Fitzsimmons >Priority: Major > Attachments: deserialize.patch > > > When resuming from a checkpoint: > {code:java} > writeStream.option("checkpointLocation", > "/tmp/checkpoint").format("console").start > {code} > The stream reader fails with: > {noformat} > osmesa.common.streaming.AugmentedDiffMicroBatchReader@59e19287 > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) > Caused by: java.lang.ClassCastException: > org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to > org.apache.spark.sql.sources.v2.reader.streaming.Offset > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at > org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) > ... 1 more > {noformat} > The root cause appears to be that the {{SerializedOffset}} (JSON, from disk) > is never deserialized; I would expect to see something along the lines of > {{reader.deserializeOffset(off.json)}} here (unless {{available}} is intended > to be deserialized elsewhere): >
[jira] [Resolved] (SPARK-25288) Kafka transaction tests are flaky
[ https://issues.apache.org/jira/browse/SPARK-25288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-25288. -- Resolution: Fixed Fix Version/s: 2.4.0 > Kafka transaction tests are flaky > - > > Key: SPARK-25288 > URL: https://issues.apache.org/jira/browse/SPARK-25288 > Project: Spark > Issue Type: Bug > Components: Tests >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > Fix For: 2.4.0 > > > http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaRelationSuite_name=read+Kafka+transactional+messages%3A+read_committed > http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceSuite_name=read+Kafka+transactional+messages%3A+read_committed > http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite_name=read+Kafka+transactional+messages%3A+read_committed -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25288) Kafka transaction tests are flaky
[ https://issues.apache.org/jira/browse/SPARK-25288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25288: - Description: http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaRelationSuite_name=read+Kafka+transactional+messages%3A+read_committed http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceSuite_name=read+Kafka+transactional+messages%3A+read_committed http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite_name=read+Kafka+transactional+messages%3A+read_committed > Kafka transaction tests are flaky > - > > Key: SPARK-25288 > URL: https://issues.apache.org/jira/browse/SPARK-25288 > Project: Spark > Issue Type: Bug > Components: Tests >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > > http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaRelationSuite_name=read+Kafka+transactional+messages%3A+read_committed > http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceSuite_name=read+Kafka+transactional+messages%3A+read_committed > http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite_name=read+Kafka+transactional+messages%3A+read_committed -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25288) Kafka transaction tests are flaky
Shixiong Zhu created SPARK-25288: Summary: Kafka transaction tests are flaky Key: SPARK-25288 URL: https://issues.apache.org/jira/browse/SPARK-25288 Project: Spark Issue Type: Bug Components: Tests Affects Versions: 2.4.0 Reporter: Shixiong Zhu Assignee: Shixiong Zhu -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-25005) Structured streaming doesn't support kafka transaction (creating empty offset with abort & markers)
[ https://issues.apache.org/jira/browse/SPARK-25005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-25005. -- Resolution: Fixed Assignee: Shixiong Zhu Fix Version/s: 2.4.0 > Structured streaming doesn't support kafka transaction (creating empty offset > with abort & markers) > --- > > Key: SPARK-25005 > URL: https://issues.apache.org/jira/browse/SPARK-25005 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.1 >Reporter: Quentin Ambard >Assignee: Shixiong Zhu >Priority: Major > Fix For: 2.4.0 > > > Structured streaming can't consume kafka transaction. > We could try to apply SPARK-24720 (DStream) logic to Structured Streaming > source -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-25218) Potential resource leaks in TransportServer and SocketAuthHelper
[ https://issues.apache.org/jira/browse/SPARK-25218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-25218. -- Resolution: Fixed Fix Version/s: 2.4.0 > Potential resource leaks in TransportServer and SocketAuthHelper > > > Key: SPARK-25218 > URL: https://issues.apache.org/jira/browse/SPARK-25218 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Minor > Fix For: 2.4.0 > > > They don't release the resources for all types of errors. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-25106) A new Kafka consumer gets created for every batch
[ https://issues.apache.org/jira/browse/SPARK-25106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-25106. -- Resolution: Duplicate Thanks for reporting this. I'm closing this as a duplicate of SPARK-24987 > A new Kafka consumer gets created for every batch > - > > Key: SPARK-25106 > URL: https://issues.apache.org/jira/browse/SPARK-25106 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.1 >Reporter: Alexis Seigneurin >Priority: Major > Attachments: console.txt > > > I have a fairly simple piece of code that reads from Kafka, applies some > transformations - including applying a UDF - and writes the result to the > console. Every time a batch is created, a new consumer is created (and not > closed), eventually leading to a "too many open files" error. > I created a test case, with the code available here: > [https://github.com/aseigneurin/spark-kafka-issue] > To reproduce: > # Start Kafka and create a topic called "persons" > # Run "Producer" to generate data > # Run "Consumer" > I am attaching the log where you can see a new consumer being initialized > between every batch. > Please note this issue does *not* appear with Spark 2.2.2, and it does not > appear either when I don't apply the UDF. > I am suspecting - although I did go far enough to confirm - that this issue > is related to the improvement made in SPARK-23623. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24987) Kafka Cached Consumer Leaking File Descriptors
[ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-24987: - Affects Version/s: 2.2.2 2.3.0 > Kafka Cached Consumer Leaking File Descriptors > -- > > Key: SPARK-24987 > URL: https://issues.apache.org/jira/browse/SPARK-24987 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.2, 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Java(TM) SE Runtime Environment (build 1.8.0_112-b15) > Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > >Reporter: Yuval Itzchakov >Assignee: Yuval Itzchakov >Priority: Critical > Fix For: 2.3.2, 2.4.0 > > > Setup: > * Spark 2.3.1 > * Java 1.8.0 (112) > * Standalone Cluster Manager > * 3 Nodes, 1 Executor per node. > Spark 2.3.0 introduced a new mechanism for caching Kafka consumers > (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) > via KafkaDataConsumer.acquire. > It seems that there are situations (I've been trying to debug it, haven't > been able to find the root cause as of yet) where cached consumers remain "in > use" throughout the life time of the task and are never released. This can be > identified by the following line of the stack trace: > at > org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460) > Which points to: > {code:java} > } else if (existingInternalConsumer.inUse) { > // If consumer is already cached but is currently in use, then return a new > consumer > NonCachedKafkaDataConsumer(newInternalConsumer) > {code} > Meaning the existing consumer created for that `TopicPartition` is still in > use for some reason. The weird thing is that you can see this for very old > tasks which have already finished successfully. > I've traced down this leak using file leak detector, attaching it to the > running Executor JVM process. I've emitted the list of open file descriptors > which [you can find > here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], > and you can see that the majority of them are epoll FD used by Kafka > Consumers, indicating that they aren't closing. > Spark graph: > {code:java} > kafkaStream > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > .as[(String, String)] > .flatMap {...} > .groupByKey(...) > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) > .foreach(...) > .outputMode(OutputMode.Update) > .option("checkpointLocation", > sparkConfiguration.properties.checkpointDirectory) > .start() > .awaitTermination(){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24987) Kafka Cached Consumer Leaking File Descriptors
[ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-24987: - Affects Version/s: (was: 2.3.0) > Kafka Cached Consumer Leaking File Descriptors > -- > > Key: SPARK-24987 > URL: https://issues.apache.org/jira/browse/SPARK-24987 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.1 > Environment: Spark 2.3.1 > Java(TM) SE Runtime Environment (build 1.8.0_112-b15) > Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > >Reporter: Yuval Itzchakov >Assignee: Yuval Itzchakov >Priority: Critical > Fix For: 2.3.2, 2.4.0 > > > Setup: > * Spark 2.3.1 > * Java 1.8.0 (112) > * Standalone Cluster Manager > * 3 Nodes, 1 Executor per node. > Spark 2.3.0 introduced a new mechanism for caching Kafka consumers > (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) > via KafkaDataConsumer.acquire. > It seems that there are situations (I've been trying to debug it, haven't > been able to find the root cause as of yet) where cached consumers remain "in > use" throughout the life time of the task and are never released. This can be > identified by the following line of the stack trace: > at > org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460) > Which points to: > {code:java} > } else if (existingInternalConsumer.inUse) { > // If consumer is already cached but is currently in use, then return a new > consumer > NonCachedKafkaDataConsumer(newInternalConsumer) > {code} > Meaning the existing consumer created for that `TopicPartition` is still in > use for some reason. The weird thing is that you can see this for very old > tasks which have already finished successfully. > I've traced down this leak using file leak detector, attaching it to the > running Executor JVM process. I've emitted the list of open file descriptors > which [you can find > here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], > and you can see that the majority of them are epoll FD used by Kafka > Consumers, indicating that they aren't closing. > Spark graph: > {code:java} > kafkaStream > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > .as[(String, String)] > .flatMap {...} > .groupByKey(...) > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) > .foreach(...) > .outputMode(OutputMode.Update) > .option("checkpointLocation", > sparkConfiguration.properties.checkpointDirectory) > .start() > .awaitTermination(){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-25214) Kafka v2 source may return duplicated records when `failOnDataLoss` is `false`
[ https://issues.apache.org/jira/browse/SPARK-25214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-25214. -- Resolution: Fixed Fix Version/s: 2.4.0 > Kafka v2 source may return duplicated records when `failOnDataLoss` is `false` > -- > > Key: SPARK-25214 > URL: https://issues.apache.org/jira/browse/SPARK-25214 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Blocker > Labels: correctness > Fix For: 2.4.0 > > > When there are missing offsets, Kafka v2 source may return duplicated records > when failOnDataLoss=false because it doesn't skip missing offsets. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25214) Kafka v2 source may return duplicated records when `failOnDataLoss` is `false`
[ https://issues.apache.org/jira/browse/SPARK-25214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25214: - Affects Version/s: (was: 2.3.1) (was: 2.3.0) 2.4.0 > Kafka v2 source may return duplicated records when `failOnDataLoss` is `false` > -- > > Key: SPARK-25214 > URL: https://issues.apache.org/jira/browse/SPARK-25214 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Blocker > Labels: correctness > Fix For: 2.4.0 > > > When there are missing offsets, Kafka v2 source may return duplicated records > when failOnDataLoss=false because it doesn't skip missing offsets. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25218) Potential resource leaks in TransportServer and SocketAuthHelper
Shixiong Zhu created SPARK-25218: Summary: Potential resource leaks in TransportServer and SocketAuthHelper Key: SPARK-25218 URL: https://issues.apache.org/jira/browse/SPARK-25218 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.0 Reporter: Shixiong Zhu Assignee: Shixiong Zhu They don't release the resources for all types of errors. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25214) Kafka v2 source may return duplicated records when `failOnDataLoss` is `false`
[ https://issues.apache.org/jira/browse/SPARK-25214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25214: - Description: When there are missing offsets, Kafka v2 source may return duplicated records when failOnDataLoss=false because it doesn't skip missing offsets. > Kafka v2 source may return duplicated records when `failOnDataLoss` is `false` > -- > > Key: SPARK-25214 > URL: https://issues.apache.org/jira/browse/SPARK-25214 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0, 2.3.1 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Blocker > Labels: correctness > > When there are missing offsets, Kafka v2 source may return duplicated records > when failOnDataLoss=false because it doesn't skip missing offsets. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25214) Kafka v2 source may return duplicated records when `failOnDataLoss` is `false`
Shixiong Zhu created SPARK-25214: Summary: Kafka v2 source may return duplicated records when `failOnDataLoss` is `false` Key: SPARK-25214 URL: https://issues.apache.org/jira/browse/SPARK-25214 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.3.1, 2.3.0 Reporter: Shixiong Zhu Assignee: Shixiong Zhu -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-25163) Flaky test: o.a.s.util.collection.ExternalAppendOnlyMapSuite.spilling with compression
[ https://issues.apache.org/jira/browse/SPARK-25163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-25163. -- Resolution: Fixed Assignee: Liang-Chi Hsieh Fix Version/s: 2.4.0 > Flaky test: o.a.s.util.collection.ExternalAppendOnlyMapSuite.spilling with > compression > -- > > Key: SPARK-25163 > URL: https://issues.apache.org/jira/browse/SPARK-25163 > Project: Spark > Issue Type: Bug > Components: Tests >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Liang-Chi Hsieh >Priority: Major > Fix For: 2.4.0 > > > I saw it failed multiple times on Jenkins: > https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/4813/testReport/junit/org.apache.spark.util.collection/ExternalAppendOnlyMapSuite/spilling_with_compression/ -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-25181) Block Manager master and slave thread pools are unbounded
[ https://issues.apache.org/jira/browse/SPARK-25181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-25181. -- Resolution: Fixed Assignee: Mukul Murthy Fix Version/s: 2.4.0 > Block Manager master and slave thread pools are unbounded > - > > Key: SPARK-25181 > URL: https://issues.apache.org/jira/browse/SPARK-25181 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Mukul Murthy >Assignee: Mukul Murthy >Priority: Major > Fix For: 2.4.0 > > > Currently, BlockManagerMasterEndpoint and BlockManagerSlaveEndpoint both have > thread pools with unbounded numbers of threads. In certain cases, this can > lead to driver OOM errors. We should add an upper bound on the number of > threads in these thread pools; this should not break any existing behavior > because they still have queues of size Integer.MAX_VALUE. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25163) Flaky test: o.a.s.util.collection.ExternalAppendOnlyMapSuite.spilling with compression
Shixiong Zhu created SPARK-25163: Summary: Flaky test: o.a.s.util.collection.ExternalAppendOnlyMapSuite.spilling with compression Key: SPARK-25163 URL: https://issues.apache.org/jira/browse/SPARK-25163 Project: Spark Issue Type: Bug Components: Tests Affects Versions: 2.4.0 Reporter: Shixiong Zhu I saw it failed multiple times on Jenkins: https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/4813/testReport/junit/org.apache.spark.util.collection/ExternalAppendOnlyMapSuite/spilling_with_compression/ -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-25116) Fix the "exit code 1" error when terminating Kafka tests
[ https://issues.apache.org/jira/browse/SPARK-25116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-25116. -- Resolution: Fixed Fix Version/s: 2.4.0 > Fix the "exit code 1" error when terminating Kafka tests > > > Key: SPARK-25116 > URL: https://issues.apache.org/jira/browse/SPARK-25116 > Project: Spark > Issue Type: Bug > Components: Tests >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > Fix For: 2.4.0 > > > SBT may report the following error when all Kafka tests are finished > {code} > sbt/sbt/0.13.17/test-interface-1.0.jar sbt.ForkMain 39359 failed with exit > code 1 > [error] (sql-kafka-0-10/test:test) sbt.TestsFailedException: Tests > unsuccessful > {code} > This is because we are leaking a Kafka cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25116) Fix the "exit code 1" error when terminating Kafka tests
Shixiong Zhu created SPARK-25116: Summary: Fix the "exit code 1" error when terminating Kafka tests Key: SPARK-25116 URL: https://issues.apache.org/jira/browse/SPARK-25116 Project: Spark Issue Type: Bug Components: Tests Affects Versions: 2.4.0 Reporter: Shixiong Zhu Assignee: Shixiong Zhu SBT may report the following error when all Kafka tests are finished {code} sbt/sbt/0.13.17/test-interface-1.0.jar sbt.ForkMain 39359 failed with exit code 1 [error] (sql-kafka-0-10/test:test) sbt.TestsFailedException: Tests unsuccessful {code} This is because we are leaking a Kafka cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24699) Watermark / Append mode should work with Trigger.Once
[ https://issues.apache.org/jira/browse/SPARK-24699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-24699: - Fix Version/s: 2.4.0 > Watermark / Append mode should work with Trigger.Once > - > > Key: SPARK-24699 > URL: https://issues.apache.org/jira/browse/SPARK-24699 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.1 >Reporter: Chris Horn >Assignee: Tathagata Das >Priority: Major > Fix For: 2.4.0, 3.0.0 > > Attachments: watermark-once.scala, watermark-stream.scala > > > I have a use case where I would like to trigger a structured streaming job > from an external scheduler (once every 15 minutes or so) and have it write > window aggregates to Kafka. > I am able to get my code to work when running with `Trigger.ProcessingTime` > but when I switch to `Trigger.Once` the watermarking feature of structured > streams does not persist to (or is not recollected from) the checkpoint state. > This causes the stream to never generate output because the watermark is > perpetually stuck at `1970-01-01T00:00:00Z`. > I have created a failing test case in the `EventTimeWatermarkSuite`, I will > create a [WIP] pull request on github and link it here. > > It seems that even if it generated the watermark, and given the current > streaming behavior, I would have to trigger the job twice to generate any > output. > > The microbatcher only calculates the watermark off of the previous batch's > input and emits new aggs based off of that timestamp. > This state is not available to a newly started `MicroBatchExecution` stream. > Would it be an appropriate strategy to create a new checkpoint file with the > most up to watermark or watermark and query stats? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20919) Simplification of CachedKafkaConsumer.
[ https://issues.apache.org/jira/browse/SPARK-20919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-20919. -- Resolution: Duplicate > Simplification of CachedKafkaConsumer. > -- > > Key: SPARK-20919 > URL: https://issues.apache.org/jira/browse/SPARK-20919 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Prashant Sharma >Priority: Major > > Using an object pool instead of a cache for recycling objects in Kafka > consumer cache. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-25081) Nested spill in ShuffleExternalSorter may access a released memory page
[ https://issues.apache.org/jira/browse/SPARK-25081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-25081. -- Resolution: Fixed > Nested spill in ShuffleExternalSorter may access a released memory page > > > Key: SPARK-25081 > URL: https://issues.apache.org/jira/browse/SPARK-25081 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0, > 2.1.1, 2.1.2, 2.1.3, 2.2.0, 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.3.2 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Blocker > Labels: correctness > Fix For: 2.4.0, 2.3.3 > > > This issue is pretty similar to SPARK-21907. > "allocateArray" in > [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] > may trigger a spill and cause ShuffleInMemorySorter access the released > `array`. Another task may get the same memory page from the pool. This will > cause two tasks access the same memory page. When a task reads memory written > by another task, many types of failures may happen. Here are some examples I > have seen: > - JVM crash. (This is easy to reproduce in a unit test as we fill newly > allocated and deallocated memory with 0xa5 and 0x5a bytes which usually > points to an invalid memory address) > - java.lang.IllegalArgumentException: Comparison method violates its general > contract! > - java.lang.NullPointerException at > org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) > - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size > -536870912 because the size after growing exceeds size limitation 2147483632 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25081) Nested spill in ShuffleExternalSorter may access a released memory page
[ https://issues.apache.org/jira/browse/SPARK-25081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25081: - Affects Version/s: 2.3.2 > Nested spill in ShuffleExternalSorter may access a released memory page > > > Key: SPARK-25081 > URL: https://issues.apache.org/jira/browse/SPARK-25081 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0, > 2.1.1, 2.1.2, 2.1.3, 2.2.0, 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.3.2 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Blocker > Labels: correctness > Fix For: 2.4.0, 2.3.3 > > > This issue is pretty similar to SPARK-21907. > "allocateArray" in > [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] > may trigger a spill and cause ShuffleInMemorySorter access the released > `array`. Another task may get the same memory page from the pool. This will > cause two tasks access the same memory page. When a task reads memory written > by another task, many types of failures may happen. Here are some examples I > have seen: > - JVM crash. (This is easy to reproduce in a unit test as we fill newly > allocated and deallocated memory with 0xa5 and 0x5a bytes which usually > points to an invalid memory address) > - java.lang.IllegalArgumentException: Comparison method violates its general > contract! > - java.lang.NullPointerException at > org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) > - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size > -536870912 because the size after growing exceeds size limitation 2147483632 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25081) Nested spill in ShuffleExternalSorter may access a released memory page
[ https://issues.apache.org/jira/browse/SPARK-25081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25081: - Fix Version/s: (was: 2.3.2) 2.3.3 > Nested spill in ShuffleExternalSorter may access a released memory page > > > Key: SPARK-25081 > URL: https://issues.apache.org/jira/browse/SPARK-25081 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0, > 2.1.1, 2.1.2, 2.1.3, 2.2.0, 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.3.2 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Blocker > Labels: correctness > Fix For: 2.4.0, 2.3.3 > > > This issue is pretty similar to SPARK-21907. > "allocateArray" in > [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] > may trigger a spill and cause ShuffleInMemorySorter access the released > `array`. Another task may get the same memory page from the pool. This will > cause two tasks access the same memory page. When a task reads memory written > by another task, many types of failures may happen. Here are some examples I > have seen: > - JVM crash. (This is easy to reproduce in a unit test as we fill newly > allocated and deallocated memory with 0xa5 and 0x5a bytes which usually > points to an invalid memory address) > - java.lang.IllegalArgumentException: Comparison method violates its general > contract! > - java.lang.NullPointerException at > org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) > - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size > -536870912 because the size after growing exceeds size limitation 2147483632 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25081) Nested spill in ShuffleExternalSorter may access a released memory page
[ https://issues.apache.org/jira/browse/SPARK-25081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25081: - Priority: Blocker (was: Major) > Nested spill in ShuffleExternalSorter may access a released memory page > > > Key: SPARK-25081 > URL: https://issues.apache.org/jira/browse/SPARK-25081 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0, > 2.1.1, 2.1.2, 2.1.3, 2.2.0, 2.2.1, 2.2.2, 2.3.0, 2.3.1 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Blocker > Labels: correctness > Fix For: 2.3.2, 2.4.0 > > > This issue is pretty similar to SPARK-21907. > "allocateArray" in > [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] > may trigger a spill and cause ShuffleInMemorySorter access the released > `array`. Another task may get the same memory page from the pool. This will > cause two tasks access the same memory page. When a task reads memory written > by another task, many types of failures may happen. Here are some examples I > have seen: > - JVM crash. (This is easy to reproduce in a unit test as we fill newly > allocated and deallocated memory with 0xa5 and 0x5a bytes which usually > points to an invalid memory address) > - java.lang.IllegalArgumentException: Comparison method violates its general > contract! > - java.lang.NullPointerException at > org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) > - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size > -536870912 because the size after growing exceeds size limitation 2147483632 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25081) Nested spill in ShuffleExternalSorter may access a released memory page
[ https://issues.apache.org/jira/browse/SPARK-25081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25081: - Affects Version/s: 1.6.2 > Nested spill in ShuffleExternalSorter may access a released memory page > > > Key: SPARK-25081 > URL: https://issues.apache.org/jira/browse/SPARK-25081 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0, > 2.1.1, 2.1.2, 2.1.3, 2.2.0, 2.2.1, 2.2.2, 2.3.0, 2.3.1 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > Labels: correctness > Fix For: 2.3.2, 2.4.0 > > > This issue is pretty similar to SPARK-21907. > "allocateArray" in > [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] > may trigger a spill and cause ShuffleInMemorySorter access the released > `array`. Another task may get the same memory page from the pool. This will > cause two tasks access the same memory page. When a task reads memory written > by another task, many types of failures may happen. Here are some examples I > have seen: > - JVM crash. (This is easy to reproduce in a unit test as we fill newly > allocated and deallocated memory with 0xa5 and 0x5a bytes which usually > points to an invalid memory address) > - java.lang.IllegalArgumentException: Comparison method violates its general > contract! > - java.lang.NullPointerException at > org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) > - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size > -536870912 because the size after growing exceeds size limitation 2147483632 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25081) Nested spill in ShuffleExternalSorter may access a released memory page
[ https://issues.apache.org/jira/browse/SPARK-25081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25081: - Affects Version/s: 1.6.0 1.6.1 2.0.0 2.0.1 2.1.0 2.1.1 2.1.2 2.2.0 2.2.1 > Nested spill in ShuffleExternalSorter may access a released memory page > > > Key: SPARK-25081 > URL: https://issues.apache.org/jira/browse/SPARK-25081 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0, > 2.1.1, 2.1.2, 2.1.3, 2.2.0, 2.2.1, 2.2.2, 2.3.0, 2.3.1 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > Labels: correctness > Fix For: 2.3.2, 2.4.0 > > > This issue is pretty similar to SPARK-21907. > "allocateArray" in > [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] > may trigger a spill and cause ShuffleInMemorySorter access the released > `array`. Another task may get the same memory page from the pool. This will > cause two tasks access the same memory page. When a task reads memory written > by another task, many types of failures may happen. Here are some examples I > have seen: > - JVM crash. (This is easy to reproduce in a unit test as we fill newly > allocated and deallocated memory with 0xa5 and 0x5a bytes which usually > points to an invalid memory address) > - java.lang.IllegalArgumentException: Comparison method violates its general > contract! > - java.lang.NullPointerException at > org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) > - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size > -536870912 because the size after growing exceeds size limitation 2147483632 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25081) Nested spill in ShuffleExternalSorter may access a released memory page
[ https://issues.apache.org/jira/browse/SPARK-25081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25081: - Affects Version/s: 1.6.3 2.0.2 2.1.3 2.2.2 > Nested spill in ShuffleExternalSorter may access a released memory page > > > Key: SPARK-25081 > URL: https://issues.apache.org/jira/browse/SPARK-25081 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.3, 2.0.2, 2.1.3, 2.2.2, 2.3.0, 2.3.1 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > Labels: correctness > Fix For: 2.3.2, 2.4.0 > > > This issue is pretty similar to SPARK-21907. > "allocateArray" in > [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] > may trigger a spill and cause ShuffleInMemorySorter access the released > `array`. Another task may get the same memory page from the pool. This will > cause two tasks access the same memory page. When a task reads memory written > by another task, many types of failures may happen. Here are some examples I > have seen: > - JVM crash. (This is easy to reproduce in a unit test as we fill newly > allocated and deallocated memory with 0xa5 and 0x5a bytes which usually > points to an invalid memory address) > - java.lang.IllegalArgumentException: Comparison method violates its general > contract! > - java.lang.NullPointerException at > org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) > - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size > -536870912 because the size after growing exceeds size limitation 2147483632 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25081) Nested spill in ShuffleExternalSorter may access a released memory page
[ https://issues.apache.org/jira/browse/SPARK-25081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25081: - Fix Version/s: 2.4.0 2.3.2 > Nested spill in ShuffleExternalSorter may access a released memory page > > > Key: SPARK-25081 > URL: https://issues.apache.org/jira/browse/SPARK-25081 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > Labels: correctness > Fix For: 2.3.2, 2.4.0 > > > This issue is pretty similar to SPARK-21907. > "allocateArray" in > [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] > may trigger a spill and cause ShuffleInMemorySorter access the released > `array`. Another task may get the same memory page from the pool. This will > cause two tasks access the same memory page. When a task reads memory written > by another task, many types of failures may happen. Here are some examples I > have seen: > - JVM crash. (This is easy to reproduce in a unit test as we fill newly > allocated and deallocated memory with 0xa5 and 0x5a bytes which usually > points to an invalid memory address) > - java.lang.IllegalArgumentException: Comparison method violates its general > contract! > - java.lang.NullPointerException at > org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) > - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size > -536870912 because the size after growing exceeds size limitation 2147483632 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25081) Nested spill in ShuffleExternalSorter may access a released memory page
[ https://issues.apache.org/jira/browse/SPARK-25081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25081: - Affects Version/s: 2.3.0 > Nested spill in ShuffleExternalSorter may access a released memory page > > > Key: SPARK-25081 > URL: https://issues.apache.org/jira/browse/SPARK-25081 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > Labels: correctness > Fix For: 2.3.2, 2.4.0 > > > This issue is pretty similar to SPARK-21907. > "allocateArray" in > [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] > may trigger a spill and cause ShuffleInMemorySorter access the released > `array`. Another task may get the same memory page from the pool. This will > cause two tasks access the same memory page. When a task reads memory written > by another task, many types of failures may happen. Here are some examples I > have seen: > - JVM crash. (This is easy to reproduce in a unit test as we fill newly > allocated and deallocated memory with 0xa5 and 0x5a bytes which usually > points to an invalid memory address) > - java.lang.IllegalArgumentException: Comparison method violates its general > contract! > - java.lang.NullPointerException at > org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) > - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size > -536870912 because the size after growing exceeds size limitation 2147483632 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25081) Nested spill in ShuffleExternalSorter may access a released memory page
[ https://issues.apache.org/jira/browse/SPARK-25081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576636#comment-16576636 ] Shixiong Zhu commented on SPARK-25081: -- That's possible. That's why I added the "corrnectness" label. > Nested spill in ShuffleExternalSorter may access a released memory page > > > Key: SPARK-25081 > URL: https://issues.apache.org/jira/browse/SPARK-25081 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > Labels: correctness > > This issue is pretty similar to SPARK-21907. > "allocateArray" in > [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] > may trigger a spill and cause ShuffleInMemorySorter access the released > `array`. Another task may get the same memory page from the pool. This will > cause two tasks access the same memory page. When a task reads memory written > by another task, many types of failures may happen. Here are some examples I > have seen: > - JVM crash. (This is easy to reproduce in a unit test as we fill newly > allocated and deallocated memory with 0xa5 and 0x5a bytes which usually > points to an invalid memory address) > - java.lang.IllegalArgumentException: Comparison method violates its general > contract! > - java.lang.NullPointerException at > org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) > - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size > -536870912 because the size after growing exceeds size limitation 2147483632 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25081) Nested spill in ShuffleExternalSorter may access a released memory page
[ https://issues.apache.org/jira/browse/SPARK-25081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25081: - Labels: correctness (was: ) > Nested spill in ShuffleExternalSorter may access a released memory page > > > Key: SPARK-25081 > URL: https://issues.apache.org/jira/browse/SPARK-25081 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > Labels: correctness > > This issue is pretty similar to SPARK-21907. > "allocateArray" in > [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] > may trigger a spill and cause ShuffleInMemorySorter access the released > `array`. Another task may get the same memory page from the pool. This will > cause two tasks access the same memory page. When a task reads memory written > by another task, many types of failures may happen. Here are some examples I > have seen: > - JVM crash. (This is easy to reproduce in a unit test as we fill newly > allocated and deallocated memory with 0xa5 and 0x5a bytes which usually > points to an invalid memory address) > - java.lang.IllegalArgumentException: Comparison method violates its general > contract! > - java.lang.NullPointerException at > org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) > - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size > -536870912 because the size after growing exceeds size limitation 2147483632 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25081) Nested spill in ShuffleExternalSorter may access a released memory page
[ https://issues.apache.org/jira/browse/SPARK-25081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25081: - Description: This issue is pretty similar to SPARK-21907. "allocateArray" in [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] may trigger a spill and cause ShuffleInMemorySorter access the released `array`. Another task may get the same memory page from the pool. This will cause two tasks access the same memory page. When a task reads memory written by another task, many types of failures may happen. Here are some examples I have seen: - JVM crash. (This is easy to reproduce in a unit test as we fill newly allocated and deallocated memory with 0xa5 and 0x5a bytes which usually points to an invalid memory address) - java.lang.IllegalArgumentException: Comparison method violates its general contract! - java.lang.NullPointerException at org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size -536870912 because the size after growing exceeds size limitation 2147483632 was: This issue is pretty similar to SPARK-21907. "allocateArray" in [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] may trigger a spill and cause ShuffleInMemorySorter access the released `array`. > Nested spill in ShuffleExternalSorter may access a released memory page > > > Key: SPARK-25081 > URL: https://issues.apache.org/jira/browse/SPARK-25081 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > > This issue is pretty similar to SPARK-21907. > "allocateArray" in > [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] > may trigger a spill and cause ShuffleInMemorySorter access the released > `array`. Another task may get the same memory page from the pool. This will > cause two tasks access the same memory page. When a task reads memory written > by another task, many types of failures may happen. Here are some examples I > have seen: > - JVM crash. (This is easy to reproduce in a unit test as we fill newly > allocated and deallocated memory with 0xa5 and 0x5a bytes which usually > points to an invalid memory address) > - java.lang.IllegalArgumentException: Comparison method violates its general > contract! > - java.lang.NullPointerException at > org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) > - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size > -536870912 because the size after growing exceeds size limitation 2147483632 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25081) Nested spill in ShuffleExternalSorter may access a released memory page
[ https://issues.apache.org/jira/browse/SPARK-25081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25081: - Description: This issue is pretty similar to SPARK-21907. "allocateArray" in [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] may trigger a spill and cause > Nested spill in ShuffleExternalSorter may access a released memory page > > > Key: SPARK-25081 > URL: https://issues.apache.org/jira/browse/SPARK-25081 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > > This issue is pretty similar to SPARK-21907. > "allocateArray" in > [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] > may trigger a spill and cause -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25081) Nested spill in ShuffleExternalSorter may access a released memory page
[ https://issues.apache.org/jira/browse/SPARK-25081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-25081: - Description: This issue is pretty similar to SPARK-21907. "allocateArray" in [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] may trigger a spill and cause ShuffleInMemorySorter access the released `array`. was: This issue is pretty similar to SPARK-21907. "allocateArray" in [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] may trigger a spill and cause > Nested spill in ShuffleExternalSorter may access a released memory page > > > Key: SPARK-25081 > URL: https://issues.apache.org/jira/browse/SPARK-25081 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > > This issue is pretty similar to SPARK-21907. > "allocateArray" in > [ShuffleInMemorySorter.reset|https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99] > may trigger a spill and cause ShuffleInMemorySorter access the released > `array`. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25081) Nested spill in ShuffleExternalSorter may access a released memory page
Shixiong Zhu created SPARK-25081: Summary: Nested spill in ShuffleExternalSorter may access a released memory page Key: SPARK-25081 URL: https://issues.apache.org/jira/browse/SPARK-25081 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.1 Reporter: Shixiong Zhu Assignee: Shixiong Zhu -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24161) Enable debug package feature on structured streaming
[ https://issues.apache.org/jira/browse/SPARK-24161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-24161. -- Resolution: Fixed Assignee: Jungtaek Lim Fix Version/s: 2.4.0 > Enable debug package feature on structured streaming > > > Key: SPARK-24161 > URL: https://issues.apache.org/jira/browse/SPARK-24161 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Jungtaek Lim >Assignee: Jungtaek Lim >Priority: Major > Fix For: 2.4.0 > > > Currently, debug package has a implicit class which matches Dataset to > provide debug features on Dataset class. It doesn't work with structured > streaming: it requires query is already started, and the information can be > retrieved from StreamingQuery, not Dataset. For the same reason, "explain" > had to be placed to StreamingQuery whereas it exists on Dataset. > This issue tracks effort to enable debug package feature on structured > streaming. Unlike batch, it may have some restrictions. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24896) Uuid expression should produce different values in each execution under streaming query
[ https://issues.apache.org/jira/browse/SPARK-24896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-24896. -- Resolution: Fixed Assignee: Liang-Chi Hsieh Fix Version/s: 2.4.0 > Uuid expression should produce different values in each execution under > streaming query > --- > > Key: SPARK-24896 > URL: https://issues.apache.org/jira/browse/SPARK-24896 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 2.3.0, 2.3.1 >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh >Priority: Major > Fix For: 2.4.0 > > > Uuid's results depend on random seed given during analysis. Thus under > streaming query, we will have the same uuids in each execution. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24896) Uuid expression should produce different values in each execution under streaming query
[ https://issues.apache.org/jira/browse/SPARK-24896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-24896: - Affects Version/s: (was: 2.4.0) 2.3.0 2.3.1 > Uuid expression should produce different values in each execution under > streaming query > --- > > Key: SPARK-24896 > URL: https://issues.apache.org/jira/browse/SPARK-24896 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 2.3.0, 2.3.1 >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh >Priority: Major > Fix For: 2.4.0 > > > Uuid's results depend on random seed given during analysis. Thus under > streaming query, we will have the same uuids in each execution. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 0.10.0.1 to 2.0.0
[ https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567010#comment-16567010 ] Shixiong Zhu commented on SPARK-18057: -- [~srowen] Could you create a new ticket for DStreams Kafka? IMO, they are two different modules and don't have to upgrade in the same version. > Update structured streaming kafka from 0.10.0.1 to 2.0.0 > > > Key: SPARK-18057 > URL: https://issues.apache.org/jira/browse/SPARK-18057 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Reporter: Cody Koeninger >Assignee: Ted Yu >Priority: Blocker > > There are a couple of relevant KIPs here, > https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18057) Update structured streaming kafka from 0.10.0.1 to 2.0.0
[ https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-18057: - Summary: Update structured streaming kafka from 0.10.0.1 to 2.0.0 (was: Update structured streaming kafka from 0.10.0.1 to 1.1.0) > Update structured streaming kafka from 0.10.0.1 to 2.0.0 > > > Key: SPARK-18057 > URL: https://issues.apache.org/jira/browse/SPARK-18057 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Reporter: Cody Koeninger >Priority: Major > Fix For: 2.4.0 > > > There are a couple of relevant KIPs here, > https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-18057) Update structured streaming kafka from 0.10.0.1 to 2.0.0
[ https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-18057. -- Resolution: Fixed Assignee: Ted Yu Fix Version/s: 2.4.0 > Update structured streaming kafka from 0.10.0.1 to 2.0.0 > > > Key: SPARK-18057 > URL: https://issues.apache.org/jira/browse/SPARK-18057 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Reporter: Cody Koeninger >Assignee: Ted Yu >Priority: Major > Fix For: 2.4.0 > > > There are a couple of relevant KIPs here, > https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24880) Fix the group id for spark-kubernetes-integration-tests
[ https://issues.apache.org/jira/browse/SPARK-24880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-24880. -- Resolution: Fixed Fix Version/s: 2.4.0 > Fix the group id for spark-kubernetes-integration-tests > --- > > Key: SPARK-24880 > URL: https://issues.apache.org/jira/browse/SPARK-24880 > Project: Spark > Issue Type: Bug > Components: Build, Kubernetes >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > Fix For: 2.4.0 > > > The correct group id should be `org.apache.spark`. This is causing the > nightly build failure: > https://amplab.cs.berkeley.edu/jenkins/job/spark-master-maven-snapshots/2295/console > {code} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-deploy-plugin:2.8.2:deploy (default-deploy) on > project spark-kubernetes-integration-tests_2.11: Failed to deploy artifacts: > Could not transfer artifact > spark-kubernetes-integration-tests:spark-kubernetes-integration-tests_2.11:jar:2.4.0-20180720.101629-1 > from/to apache.snapshots.https > (https://repository.apache.org/content/repositories/snapshots): Access denied > to: > https://repository.apache.org/content/repositories/snapshots/spark-kubernetes-integration-tests/spark-kubernetes-integration-tests_2.11/2.4.0-SNAPSHOT/spark-kubernetes-integration-tests_2.11-2.4.0-20180720.101629-1.jar, > ReasonPhrase: Forbidden. -> [Help 1] > [ERROR] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24880) Fix the group id for spark-kubernetes-integration-tests
[ https://issues.apache.org/jira/browse/SPARK-24880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-24880: - Description: The correct group id should be `org.apache.spark`. This is causing the nightly build failure: https://amplab.cs.berkeley.edu/jenkins/job/spark-master-maven-snapshots/2295/console {code} [ERROR] Failed to execute goal org.apache.maven.plugins:maven-deploy-plugin:2.8.2:deploy (default-deploy) on project spark-kubernetes-integration-tests_2.11: Failed to deploy artifacts: Could not transfer artifact spark-kubernetes-integration-tests:spark-kubernetes-integration-tests_2.11:jar:2.4.0-20180720.101629-1 from/to apache.snapshots.https (https://repository.apache.org/content/repositories/snapshots): Access denied to: https://repository.apache.org/content/repositories/snapshots/spark-kubernetes-integration-tests/spark-kubernetes-integration-tests_2.11/2.4.0-SNAPSHOT/spark-kubernetes-integration-tests_2.11-2.4.0-20180720.101629-1.jar, ReasonPhrase: Forbidden. -> [Help 1] [ERROR] {code} > Fix the group id for spark-kubernetes-integration-tests > --- > > Key: SPARK-24880 > URL: https://issues.apache.org/jira/browse/SPARK-24880 > Project: Spark > Issue Type: Bug > Components: Build, Kubernetes >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > > The correct group id should be `org.apache.spark`. This is causing the > nightly build failure: > https://amplab.cs.berkeley.edu/jenkins/job/spark-master-maven-snapshots/2295/console > {code} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-deploy-plugin:2.8.2:deploy (default-deploy) on > project spark-kubernetes-integration-tests_2.11: Failed to deploy artifacts: > Could not transfer artifact > spark-kubernetes-integration-tests:spark-kubernetes-integration-tests_2.11:jar:2.4.0-20180720.101629-1 > from/to apache.snapshots.https > (https://repository.apache.org/content/repositories/snapshots): Access denied > to: > https://repository.apache.org/content/repositories/snapshots/spark-kubernetes-integration-tests/spark-kubernetes-integration-tests_2.11/2.4.0-SNAPSHOT/spark-kubernetes-integration-tests_2.11-2.4.0-20180720.101629-1.jar, > ReasonPhrase: Forbidden. -> [Help 1] > [ERROR] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24880) Fix the group id for spark-kubernetes-integration-tests
Shixiong Zhu created SPARK-24880: Summary: Fix the group id for spark-kubernetes-integration-tests Key: SPARK-24880 URL: https://issues.apache.org/jira/browse/SPARK-24880 Project: Spark Issue Type: Bug Components: Build, Kubernetes Affects Versions: 2.4.0 Reporter: Shixiong Zhu Assignee: Shixiong Zhu -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24566) Fix spark.storage.blockManagerSlaveTimeoutMs default config
[ https://issues.apache.org/jira/browse/SPARK-24566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-24566. -- Resolution: Fixed Assignee: xueyu Fix Version/s: 2.4.0 > Fix spark.storage.blockManagerSlaveTimeoutMs default config > --- > > Key: SPARK-24566 > URL: https://issues.apache.org/jira/browse/SPARK-24566 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: xueyu >Assignee: xueyu >Priority: Major > Fix For: 2.4.0 > > > As configuration doc said, use "spark.network.timeout" replacing > "spark.storage.blockManagerSlaveTimeoutMs" when it is not configured. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24566) Fix spark.storage.blockManagerSlaveTimeoutMs default config
[ https://issues.apache.org/jira/browse/SPARK-24566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-24566: - Summary: Fix spark.storage.blockManagerSlaveTimeoutMs default config (was: spark.storage.blockManagerSlaveTimeoutMs default config) > Fix spark.storage.blockManagerSlaveTimeoutMs default config > --- > > Key: SPARK-24566 > URL: https://issues.apache.org/jira/browse/SPARK-24566 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: xueyu >Priority: Major > > As configuration doc said, use "spark.network.timeout" replacing > "spark.storage.blockManagerSlaveTimeoutMs" when it is not configured. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24451) Spark-Streaming-Kafka-1.6.3- KafkaUtils.createStream function uses the old Logging class
[ https://issues.apache.org/jira/browse/SPARK-24451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-24451. -- Resolution: Not A Problem > Spark-Streaming-Kafka-1.6.3- KafkaUtils.createStream function uses the old > Logging class > > > Key: SPARK-24451 > URL: https://issues.apache.org/jira/browse/SPARK-24451 > Project: Spark > Issue Type: Bug > Components: Build, Spark Core >Affects Versions: 2.3.0 > Environment: Spark 2.3.0 > Spark Streaming 2.3.0 > Kafka 1.1.0 > >Reporter: Ankit Prakash Gupta >Priority: Major > Labels: kafka-1.1.0, spark-streaming, spark-streaming-kafka > Attachments: Screenshot from 2018-06-01 23-58-48.png > > > While trying using org.apache.spark.streaming.kafka.KafkaUtils.createStream > function to create a stream for consumer, I came across an error > "java.lang.NoClassDefFoundError: org/apache/spark/Logging". I checked the API > documents and found that the Logging Interface was shifted to > "org.apache.spark.internal" package from "org.apache.spark" package. > > Kindly update the same in the class definitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525290#comment-16525290 ] Shixiong Zhu commented on SPARK-24630: -- Structured Streaming supports standard SQL as the batch queries, so the users can switch their queries between batch and streaming easily. Could you clarify what problems SqlStreaming solves and what are the benefits of the new syntax? > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-24630: - Component/s: (was: SQL) Structured Streaming > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reassigned SPARK-24578: Assignee: Wenbo Zhao > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Assignee: Wenbo Zhao >Priority: Blocker > Fix For: 2.3.2, 2.4.0 > > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.count > (1 to 10).toArray.par.map { i => println(i); >
[jira] [Resolved] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-24578. -- Resolution: Fixed Fix Version/s: 2.3.2 Issue resolved by pull request 21593 [https://github.com/apache/spark/pull/21593] > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Blocker > Fix For: 2.3.2 > > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache;
[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-24578: - Fix Version/s: 2.4.0 > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Blocker > Fix For: 2.3.2, 2.4.0 > > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.count > (1 to 10).toArray.par.map { i => println(i); > df.groupBy("x1").agg(count("value")).show() } >
[jira] [Resolved] (SPARK-24565) Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame
[ https://issues.apache.org/jira/browse/SPARK-24565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-24565. -- Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 21571 [https://github.com/apache/spark/pull/21571] > Add API for in Structured Streaming for exposing output rows of each > microbatch as a DataFrame > -- > > Key: SPARK-24565 > URL: https://issues.apache.org/jira/browse/SPARK-24565 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Tathagata Das >Assignee: Tathagata Das >Priority: Major > Fix For: 2.4.0 > > > Currently, the micro-batches in the MicroBatchExecution is not exposed to the > user through any public API. This was because we did not want to expose the > micro-batches, so that all the APIs we expose, we can eventually support them > in the Continuous engine. But now that we have a better sense of building a > ContinuousExecution, I am considering adding APIs which will run only the > MicroBatchExecution. I have quite a few use cases where exposing the > micro-batch output as a dataframe is useful. > - Pass the output rows of each batch to a library that is designed only the > batch jobs (example, uses many ML libraries need to collect() while learning). > - Reuse batch data sources for output whose streaming version does not exist > (e.g. redshift data source). > - Writer the output rows to multiple places by writing twice for each batch. > This is not the most elegant thing to do for multiple-output streaming > queries but is likely to be better than running two streaming queries > processing the same data twice. > The proposal is to add a method {{foreachBatch(f: Dataset[T] => Unit)}} to > Scala/Java/Python {{DataStreamWriter}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24235) create the top-of-task RDD sending rows to the remote buffer
[ https://issues.apache.org/jira/browse/SPARK-24235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reassigned SPARK-24235: Assignee: Jose Torres > create the top-of-task RDD sending rows to the remote buffer > > > Key: SPARK-24235 > URL: https://issues.apache.org/jira/browse/SPARK-24235 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Assignee: Jose Torres >Priority: Major > Fix For: 2.4.0 > > > [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii] > > Note that after > [https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239],this > will need to be responsible for incrementing its task's EpochTracker. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24235) create the top-of-task RDD sending rows to the remote buffer
[ https://issues.apache.org/jira/browse/SPARK-24235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-24235. -- Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 21428 [https://github.com/apache/spark/pull/21428] > create the top-of-task RDD sending rows to the remote buffer > > > Key: SPARK-24235 > URL: https://issues.apache.org/jira/browse/SPARK-24235 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Assignee: Jose Torres >Priority: Major > Fix For: 2.4.0 > > > [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii] > > Note that after > [https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239],this > will need to be responsible for incrementing its task's EpochTracker. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24466) TextSocketMicroBatchReader no longer works with nc utility
[ https://issues.apache.org/jira/browse/SPARK-24466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-24466: - Target Version/s: 2.4.0 > TextSocketMicroBatchReader no longer works with nc utility > -- > > Key: SPARK-24466 > URL: https://issues.apache.org/jira/browse/SPARK-24466 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jungtaek Lim >Priority: Major > > While playing with Spark 2.4.0-SNAPSHOT, I found nc command exits before > reading actual data so the query also exits with error. > > The reason is due to launching temporary reader for reading schema, and > closing reader, and re-opening reader. While reliable socket server should be > able to handle this without any issue, nc command normally can't handle > multiple connections and simply exits when closing temporary reader. > > Given that socket source is expected to be used from examples on official > document or some experiments, which we tend to simply use netcat, this is > better to be treated as bug, though this is a kind of limitation on netcat. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24279) Incompatible byte code errors, when using test-jar of spark sql.
[ https://issues.apache.org/jira/browse/SPARK-24279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503630#comment-16503630 ] Shixiong Zhu commented on SPARK-24279: -- Just did a quick look at your pom.xml. I think it's missing the following catalyst test jar. {code} org.apache.spark spark-catalyst_${scala.binary.version} ${project.version} test-jar test {code} > Incompatible byte code errors, when using test-jar of spark sql. > > > Key: SPARK-24279 > URL: https://issues.apache.org/jira/browse/SPARK-24279 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming, Tests >Affects Versions: 2.3.0, 2.4.0 >Reporter: Prashant Sharma >Priority: Major > > Using test libraries available with spark sql streaming, produces weird > incompatible byte code errors. It is already tested on a different virtual > box instance to make sure, that it is not related to my system environment. A > reproducer is uploaded to github. > [https://github.com/ScrapCodes/spark-bug-reproducer] > > Doing a clean build reproduces the error. > > Verbatim paste of the error. > {code:java} > [INFO] Compiling 1 source files to > /home/prashant/work/test/target/test-classes at 1526380360990 > [ERROR] error: missing or invalid dependency detected while loading class > file 'QueryTest.class'. > [INFO] Could not access type PlanTest in package > org.apache.spark.sql.catalyst.plans, > [INFO] because it (or its dependencies) are missing. Check your build > definition for > [INFO] missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to > see the problematic classpath.) > [INFO] A full rebuild may help if 'QueryTest.class' was compiled against an > incompatible version of org.apache.spark.sql.catalyst.plans. > [ERROR] error: missing or invalid dependency detected while loading class > file 'SQLTestUtilsBase.class'. > [INFO] Could not access type PlanTestBase in package > org.apache.spark.sql.catalyst.plans, > [INFO] because it (or its dependencies) are missing. Check your build > definition for > [INFO] missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to > see the problematic classpath.) > [INFO] A full rebuild may help if 'SQLTestUtilsBase.class' was compiled > against an incompatible version of org.apache.spark.sql.catalyst.plans. > [ERROR] error: missing or invalid dependency detected while loading class > file 'SQLTestUtils.class'. > [INFO] Could not access type PlanTest in package > org.apache.spark.sql.catalyst.plans, > [INFO] because it (or its dependencies) are missing. Check your build > definition for > [INFO] missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to > see the problematic classpath.) > [INFO] A full rebuild may help if 'SQLTestUtils.class' was compiled against > an incompatible version of org.apache.spark.sql.catalyst.plans. > [ERROR] /home/prashant/work/test/src/test/scala/SparkStreamingTests.scala:25: > error: Unable to find encoder for type stored in a Dataset. Primitive types > (Int, String, etc) and Product types (case classes) are supported by > importing spark.implicits._ Support for serializing other types will be added > in future releases. > [ERROR] val inputData = MemoryStream[Int] > [ERROR] ^ > [ERROR] /home/prashant/work/test/src/test/scala/SparkStreamingTests.scala:30: > error: Unable to find encoder for type stored in a Dataset. Primitive types > (Int, String, etc) and Product types (case classes) are supported by > importing spark.implicits._ Support for serializing other types will be added > in future releases. > [ERROR] CheckAnswer(2, 3, 4)) > [ERROR] ^ > [ERROR] 5 errors found > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24472) Orc RecordReaderFactory throws IndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/SPARK-24472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502914#comment-16502914 ] Shixiong Zhu commented on SPARK-24472: -- cc [~dongjoon] > Orc RecordReaderFactory throws IndexOutOfBoundsException > > > Key: SPARK-24472 > URL: https://issues.apache.org/jira/browse/SPARK-24472 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Shixiong Zhu >Priority: Major > > When the column number of the underlying file schema is greater than the > column number of the table schema, Orc RecordReaderFactory will throw > IndexOutOfBoundsException. "spark.sql.hive.convertMetastoreOrc" should be > turned off to use HiveTableScanExec. Here is a reproducer: > {code} > scala> :paste > // Entering paste mode (ctrl-D to finish) > Seq(("abc", 123, 123L)).toDF("s", "i", > "l").write.partitionBy("i").format("orc").mode("append").save("/tmp/orctest") > spark.sql(""" > CREATE EXTERNAL TABLE orctest(s string) > PARTITIONED BY (i int) > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' > WITH SERDEPROPERTIES ( > 'serialization.format' = '1' > ) > STORED AS > INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' > OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' > LOCATION '/tmp/orctest' > """) > spark.sql("msck repair table orctest") > spark.sql("set spark.sql.hive.convertMetastoreOrc=false") > // Exiting paste mode, now interpreting. > 18/06/05 15:34:52 WARN ObjectStore: Failed to get database global_temp, > returning NoSuchObjectException > res0: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> spark.read.format("orc").load("/tmp/orctest").show() > +---+---+---+ > | s| l| i| > +---+---+---+ > |abc|123|123| > +---+---+---+ > scala> spark.sql("select * from orctest").show() > 18/06/05 15:34:59 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) > java.lang.IndexOutOfBoundsException: toIndex = 2 > at java.util.ArrayList.subListRangeCheck(ArrayList.java:1004) > at java.util.ArrayList.subList(ArrayList.java:996) > at > org.apache.hadoop.hive.ql.io.orc.RecordReaderFactory.getSchemaOnRead(RecordReaderFactory.java:161) > at > org.apache.hadoop.hive.ql.io.orc.RecordReaderFactory.createTreeReader(RecordReaderFactory.java:66) > at > org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.(RecordReaderImpl.java:202) > at > org.apache.hadoop.hive.ql.io.orc.ReaderImpl.rowsOptions(ReaderImpl.java:539) > at > org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger$ReaderPair.(OrcRawRecordMerger.java:183) > at > org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger$OriginalReaderPair.(OrcRawRecordMerger.java:226) > at > org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.(OrcRawRecordMerger.java:437) > at > org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getReader(OrcInputFormat.java:1215) > at > org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getRecordReader(OrcInputFormat.java:1113) > at > org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:267) > at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:266) > at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:224) > at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:95) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at >
[jira] [Commented] (SPARK-24472) Orc RecordReaderFactory throws IndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/SPARK-24472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502609#comment-16502609 ] Shixiong Zhu commented on SPARK-24472: -- cc [~cloud_fan] > Orc RecordReaderFactory throws IndexOutOfBoundsException > > > Key: SPARK-24472 > URL: https://issues.apache.org/jira/browse/SPARK-24472 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Shixiong Zhu >Priority: Major > > When the column number of the underlying file schema is greater than the > column number of the table schema, Orc RecordReaderFactory will throw > IndexOutOfBoundsException. "spark.sql.hive.convertMetastoreOrc" should be > turned off to use HiveTableScanExec. Here is a reproducer: > {code} > scala> :paste > // Entering paste mode (ctrl-D to finish) > Seq(("abc", 123, 123L)).toDF("s", "i", > "l").write.partitionBy("i").format("orc").mode("append").save("/tmp/orctest") > spark.sql(""" > CREATE EXTERNAL TABLE orctest(s string) > PARTITIONED BY (i int) > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' > WITH SERDEPROPERTIES ( > 'serialization.format' = '1' > ) > STORED AS > INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' > OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' > LOCATION '/tmp/orctest' > """) > spark.sql("msck repair table orctest") > spark.sql("set spark.sql.hive.convertMetastoreOrc=false") > // Exiting paste mode, now interpreting. > 18/06/05 15:34:52 WARN ObjectStore: Failed to get database global_temp, > returning NoSuchObjectException > res0: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> spark.read.format("orc").load("/tmp/orctest").show() > +---+---+---+ > | s| l| i| > +---+---+---+ > |abc|123|123| > +---+---+---+ > scala> spark.sql("select * from orctest").show() > 18/06/05 15:34:59 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) > java.lang.IndexOutOfBoundsException: toIndex = 2 > at java.util.ArrayList.subListRangeCheck(ArrayList.java:1004) > at java.util.ArrayList.subList(ArrayList.java:996) > at > org.apache.hadoop.hive.ql.io.orc.RecordReaderFactory.getSchemaOnRead(RecordReaderFactory.java:161) > at > org.apache.hadoop.hive.ql.io.orc.RecordReaderFactory.createTreeReader(RecordReaderFactory.java:66) > at > org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.(RecordReaderImpl.java:202) > at > org.apache.hadoop.hive.ql.io.orc.ReaderImpl.rowsOptions(ReaderImpl.java:539) > at > org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger$ReaderPair.(OrcRawRecordMerger.java:183) > at > org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger$OriginalReaderPair.(OrcRawRecordMerger.java:226) > at > org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.(OrcRawRecordMerger.java:437) > at > org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getReader(OrcInputFormat.java:1215) > at > org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getRecordReader(OrcInputFormat.java:1113) > at > org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:267) > at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:266) > at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:224) > at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:95) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at >
[jira] [Created] (SPARK-24472) Orc RecordReaderFactory throws IndexOutOfBoundsException
Shixiong Zhu created SPARK-24472: Summary: Orc RecordReaderFactory throws IndexOutOfBoundsException Key: SPARK-24472 URL: https://issues.apache.org/jira/browse/SPARK-24472 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0 Reporter: Shixiong Zhu When the column number of the underlying file schema is greater than the column number of the table schema, Orc RecordReaderFactory will throw IndexOutOfBoundsException. "spark.sql.hive.convertMetastoreOrc" should be turned off to use HiveTableScanExec. Here is a reproducer: {code} scala> :paste // Entering paste mode (ctrl-D to finish) Seq(("abc", 123, 123L)).toDF("s", "i", "l").write.partitionBy("i").format("orc").mode("append").save("/tmp/orctest") spark.sql(""" CREATE EXTERNAL TABLE orctest(s string) PARTITIONED BY (i int) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' WITH SERDEPROPERTIES ( 'serialization.format' = '1' ) STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION '/tmp/orctest' """) spark.sql("msck repair table orctest") spark.sql("set spark.sql.hive.convertMetastoreOrc=false") // Exiting paste mode, now interpreting. 18/06/05 15:34:52 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException res0: org.apache.spark.sql.DataFrame = [key: string, value: string] scala> spark.read.format("orc").load("/tmp/orctest").show() +---+---+---+ | s| l| i| +---+---+---+ |abc|123|123| +---+---+---+ scala> spark.sql("select * from orctest").show() 18/06/05 15:34:59 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) java.lang.IndexOutOfBoundsException: toIndex = 2 at java.util.ArrayList.subListRangeCheck(ArrayList.java:1004) at java.util.ArrayList.subList(ArrayList.java:996) at org.apache.hadoop.hive.ql.io.orc.RecordReaderFactory.getSchemaOnRead(RecordReaderFactory.java:161) at org.apache.hadoop.hive.ql.io.orc.RecordReaderFactory.createTreeReader(RecordReaderFactory.java:66) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.(RecordReaderImpl.java:202) at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.rowsOptions(ReaderImpl.java:539) at org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger$ReaderPair.(OrcRawRecordMerger.java:183) at org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger$OriginalReaderPair.(OrcRawRecordMerger.java:226) at org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.(OrcRawRecordMerger.java:437) at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getReader(OrcInputFormat.java:1215) at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getRecordReader(OrcInputFormat.java:1113) at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:267) at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:266) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:224) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:95) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109)
[jira] [Assigned] (SPARK-24351) offsetLog/commitLog purge thresholdBatchId should be computed with current committed epoch but not currentBatchId in CP mode
[ https://issues.apache.org/jira/browse/SPARK-24351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reassigned SPARK-24351: Assignee: huangtengfei > offsetLog/commitLog purge thresholdBatchId should be computed with current > committed epoch but not currentBatchId in CP mode > > > Key: SPARK-24351 > URL: https://issues.apache.org/jira/browse/SPARK-24351 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: huangtengfei >Assignee: huangtengfei >Priority: Major > Fix For: 2.4.0 > > > In structured streaming, there is a conf > spark.sql.streaming.minBatchesToRetain which is set to specify 'The minimum > number of batches that must be retained and made recoverable' as described in > [SQLConf|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L802]. > In continuous processing, the metadata purge is triggered when an epoch is > committed in > [ContinuousExecution|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala#L306]. > > Since currentBatchId increases independently in cp mode, the current > committed epoch may be far behind currentBatchId if some task hangs for some > time. It is not safe to discard the metadata with thresholdBatchId computed > based on currentBatchId because we may clean all the metadata in the > checkpoint directory. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24351) offsetLog/commitLog purge thresholdBatchId should be computed with current committed epoch but not currentBatchId in CP mode
[ https://issues.apache.org/jira/browse/SPARK-24351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-24351. -- Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 21400 [https://github.com/apache/spark/pull/21400] > offsetLog/commitLog purge thresholdBatchId should be computed with current > committed epoch but not currentBatchId in CP mode > > > Key: SPARK-24351 > URL: https://issues.apache.org/jira/browse/SPARK-24351 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: huangtengfei >Assignee: huangtengfei >Priority: Major > Fix For: 2.4.0 > > > In structured streaming, there is a conf > spark.sql.streaming.minBatchesToRetain which is set to specify 'The minimum > number of batches that must be retained and made recoverable' as described in > [SQLConf|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L802]. > In continuous processing, the metadata purge is triggered when an epoch is > committed in > [ContinuousExecution|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala#L306]. > > Since currentBatchId increases independently in cp mode, the current > committed epoch may be far behind currentBatchId if some task hangs for some > time. It is not safe to discard the metadata with thresholdBatchId computed > based on currentBatchId because we may clean all the metadata in the > checkpoint directory. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23649) CSV schema inferring fails on some UTF-8 chars
[ https://issues.apache.org/jira/browse/SPARK-23649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16495843#comment-16495843 ] Shixiong Zhu commented on SPARK-23649: -- [~cloud_fan] looks like this is fixed? > CSV schema inferring fails on some UTF-8 chars > -- > > Key: SPARK-23649 > URL: https://issues.apache.org/jira/browse/SPARK-23649 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Maxim Gekk >Priority: Major > Attachments: utf8xFF.csv > > > Schema inferring of CSV files fails if the file contains a char starts from > *0xFF.* > {code:java} > spark.read.option("header", "true").csv("utf8xFF.csv") > {code} > {code:java} > java.lang.ArrayIndexOutOfBoundsException: 63 > at > org.apache.spark.unsafe.types.UTF8String.numBytesForFirstByte(UTF8String.java:191) > at org.apache.spark.unsafe.types.UTF8String.numChars(UTF8String.java:206) > {code} > Here is content of the file: > {code:java} > hexdump -C ~/tmp/utf8xFF.csv > 63 68 61 6e 6e 65 6c 2c 63 6f 64 65 0d 0a 55 6e |channel,code..Un| > 0010 69 74 65 64 2c 31 32 33 0d 0a 41 42 47 55 4e ff |ited,123..ABGUN.| > 0020 2c 34 35 36 0d|,456.| > 0025 > {code} > Schema inferring doesn't fail in multiline mode: > {code} > spark.read.option("header", "true").option("multiline", > "true").csv("utf8xFF.csv") > {code} > {code:java} > +---+-+ > |channel|code > +---+-+ > | United| 123 > | ABGUN�| 456 > +---+-+ > {code} > and Spark is able to read the csv file if the schema is specified: > {code} > import org.apache.spark.sql.types._ > val schema = new StructType().add("channel", StringType).add("code", > StringType) > spark.read.option("header", "true").schema(schema).csv("utf8xFF.csv").show > {code} > {code:java} > +---++ > |channel|code| > +---++ > | United| 123| > | ABGUN�| 456| > +---++ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24332) Fix places reading 'spark.network.timeout' as milliseconds
[ https://issues.apache.org/jira/browse/SPARK-24332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-24332. -- Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 21382 [https://github.com/apache/spark/pull/21382] > Fix places reading 'spark.network.timeout' as milliseconds > -- > > Key: SPARK-24332 > URL: https://issues.apache.org/jira/browse/SPARK-24332 > Project: Spark > Issue Type: Improvement > Components: Mesos, Structured Streaming >Affects Versions: 2.3.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > Fix For: 2.4.0 > > > There are some places reading "spark.network.timeout" using "getTimeAsMs" > rather than "getTimeAsSeconds". This will return a wrong value when the user > specifies a value without a time unit. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24337) Improve the error message for invalid SQL conf value
Shixiong Zhu created SPARK-24337: Summary: Improve the error message for invalid SQL conf value Key: SPARK-24337 URL: https://issues.apache.org/jira/browse/SPARK-24337 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Shixiong Zhu Right now Spark will throw the following error message when a config is set to an invalid value. It would be great if the error message contains the config key so that it's easy to tell which one is wrong. {code} Size must be specified as bytes (b), kibibytes (k), mebibytes (m), gibibytes (g), tebibytes (t), or pebibytes(p). E.g. 50b, 100k, or 250m. Fractional values are not supported. Input was: 1.6 at org.apache.spark.network.util.JavaUtils.byteStringAs(JavaUtils.java:291) at org.apache.spark.internal.config.ConfigHelpers$.byteFromString(ConfigBuilder.scala:66) at org.apache.spark.internal.config.ConfigBuilder$$anonfun$bytesConf$1.apply(ConfigBuilder.scala:234) at org.apache.spark.internal.config.ConfigBuilder$$anonfun$bytesConf$1.apply(ConfigBuilder.scala:234) at org.apache.spark.sql.internal.SQLConf.setConfString(SQLConf.scala:1300) at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$mergeSparkConf$1.apply(BaseSessionStateBuilder.scala:78) at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$mergeSparkConf$1.apply(BaseSessionStateBuilder.scala:77) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.spark.sql.internal.BaseSessionStateBuilder.mergeSparkConf(BaseSessionStateBuilder.scala:77) at org.apache.spark.sql.internal.BaseSessionStateBuilder.conf$lzycompute(BaseSessionStateBuilder.scala:90) at org.apache.spark.sql.internal.BaseSessionStateBuilder.conf(BaseSessionStateBuilder.scala:88) at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:289) at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1071) ... 59 more {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24332) Fix places reading 'spark.network.timeout' as milliseconds
Shixiong Zhu created SPARK-24332: Summary: Fix places reading 'spark.network.timeout' as milliseconds Key: SPARK-24332 URL: https://issues.apache.org/jira/browse/SPARK-24332 Project: Spark Issue Type: Improvement Components: Mesos, Structured Streaming Affects Versions: 2.3.0 Reporter: Shixiong Zhu Assignee: Shixiong Zhu There are some places reading "spark.network.timeout" using "getTimeAsMs" rather than "getTimeAsSeconds". This will return a wrong value when the user specifies a value without a time unit. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24159) Enable no-data micro batches for streaming mapGroupswithState
[ https://issues.apache.org/jira/browse/SPARK-24159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-24159. -- Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 21345 [https://github.com/apache/spark/pull/21345] > Enable no-data micro batches for streaming mapGroupswithState > - > > Key: SPARK-24159 > URL: https://issues.apache.org/jira/browse/SPARK-24159 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Tathagata Das >Assignee: Tathagata Das >Priority: Major > Fix For: 2.4.0 > > > When event-time timeout is enabled, then use watermark updates to decide > whether to run another batch > When processing-time timeout is enabled, then use the processing time and to > decide when to run more batches. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24159) Enable no-data micro batches for streaming mapGroupswithState
[ https://issues.apache.org/jira/browse/SPARK-24159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reassigned SPARK-24159: Assignee: Tathagata Das > Enable no-data micro batches for streaming mapGroupswithState > - > > Key: SPARK-24159 > URL: https://issues.apache.org/jira/browse/SPARK-24159 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Tathagata Das >Assignee: Tathagata Das >Priority: Major > Fix For: 2.4.0 > > > When event-time timeout is enabled, then use watermark updates to decide > whether to run another batch > When processing-time timeout is enabled, then use the processing time and to > decide when to run more batches. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20538) Dataset.reduce operator should use withNewExecutionId (as foreach or foreachPartition)
[ https://issues.apache.org/jira/browse/SPARK-20538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reassigned SPARK-20538: Assignee: Soham Aurangabadkar > Dataset.reduce operator should use withNewExecutionId (as foreach or > foreachPartition) > -- > > Key: SPARK-20538 > URL: https://issues.apache.org/jira/browse/SPARK-20538 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Jacek Laskowski >Assignee: Soham Aurangabadkar >Priority: Trivial > Fix For: 2.4.0 > > > {{Dataset.reduce}} is not tracked using {{executionId}} so it's not displayed > in SQL tab (like {{foreach}} or {{foreachPartition}}). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20538) Dataset.reduce operator should use withNewExecutionId (as foreach or foreachPartition)
[ https://issues.apache.org/jira/browse/SPARK-20538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-20538. -- Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 21316 [https://github.com/apache/spark/pull/21316] > Dataset.reduce operator should use withNewExecutionId (as foreach or > foreachPartition) > -- > > Key: SPARK-20538 > URL: https://issues.apache.org/jira/browse/SPARK-20538 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Jacek Laskowski >Priority: Trivial > Fix For: 2.4.0 > > > {{Dataset.reduce}} is not tracked using {{executionId}} so it's not displayed > in SQL tab (like {{foreach}} or {{foreachPartition}}). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24040) support single partition aggregates
[ https://issues.apache.org/jira/browse/SPARK-24040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reassigned SPARK-24040: Assignee: Jose Torres > support single partition aggregates > --- > > Key: SPARK-24040 > URL: https://issues.apache.org/jira/browse/SPARK-24040 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Assignee: Jose Torres >Priority: Major > Fix For: 2.4.0 > > > Single partition aggregates are a useful milestone because they don't involve > a shuffle. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24040) support single partition aggregates
[ https://issues.apache.org/jira/browse/SPARK-24040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-24040. -- Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 21239 [https://github.com/apache/spark/pull/21239] > support single partition aggregates > --- > > Key: SPARK-24040 > URL: https://issues.apache.org/jira/browse/SPARK-24040 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Assignee: Jose Torres >Priority: Major > Fix For: 2.4.0 > > > Single partition aggregates are a useful milestone because they don't involve > a shuffle. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24067) Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction))
[ https://issues.apache.org/jira/browse/SPARK-24067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474628#comment-16474628 ] Shixiong Zhu commented on SPARK-24067: -- Okey. Since this is turned off by default, I'm okey to backport it as it doesn't affect existing jobs. > Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle > Non-consecutive Offsets (i.e. Log Compaction)) > > > Key: SPARK-24067 > URL: https://issues.apache.org/jira/browse/SPARK-24067 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.3.0 >Reporter: Joachim Hereth >Assignee: Cody Koeninger >Priority: Major > Fix For: 2.3.1 > > > SPARK-17147 fixes a problem with non-consecutive Kafka Offsets. The [PR > w|https://github.com/apache/spark/pull/20572]as merged to {{master}}. This > should be backported to 2.3. > > Original Description from SPARK-17147 : > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24067) Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction))
[ https://issues.apache.org/jira/browse/SPARK-24067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474532#comment-16474532 ] Shixiong Zhu commented on SPARK-24067: -- [~srowen] this sounds more like a new feature to me. The PR itself has a feature flag and it's supposed to support a Kafka feature that we didn't support before. > Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle > Non-consecutive Offsets (i.e. Log Compaction)) > > > Key: SPARK-24067 > URL: https://issues.apache.org/jira/browse/SPARK-24067 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.3.0 >Reporter: Joachim Hereth >Assignee: Cody Koeninger >Priority: Major > Fix For: 2.3.1 > > > SPARK-17147 fixes a problem with non-consecutive Kafka Offsets. The [PR > w|https://github.com/apache/spark/pull/20572]as merged to {{master}}. This > should be backported to 2.3. > > Original Description from SPARK-17147 : > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24246) Improve AnalysisException by setting the cause when it's available
Shixiong Zhu created SPARK-24246: Summary: Improve AnalysisException by setting the cause when it's available Key: SPARK-24246 URL: https://issues.apache.org/jira/browse/SPARK-24246 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Shixiong Zhu Assignee: Shixiong Zhu If there is an exception, it's better to set it as the cause of AnalysisException since the exception may contain useful debug information. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24214) StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation.toJSON should not fail
[ https://issues.apache.org/jira/browse/SPARK-24214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-24214. -- Resolution: Fixed Fix Version/s: 2.3.1 2.4.0 Issue resolved by pull request 21275 [https://github.com/apache/spark/pull/21275] > StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation.toJSON > should not fail > - > > Key: SPARK-24214 > URL: https://issues.apache.org/jira/browse/SPARK-24214 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > Fix For: 2.4.0, 2.3.1 > > > We should overwrite "otherCopyArgs" to provide the SparkSession parameter > otherwise TreeNode.toJSON cannot get the full constructor parameter list. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14146) Imported implicits can't be found in Spark REPL in some cases
[ https://issues.apache.org/jira/browse/SPARK-14146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468094#comment-16468094 ] Shixiong Zhu commented on SPARK-14146: -- [~ashawley] Yeah, we can close this ticket when we upgrade Scala 2.11. > Imported implicits can't be found in Spark REPL in some cases > - > > Key: SPARK-14146 > URL: https://issues.apache.org/jira/browse/SPARK-14146 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 >Reporter: Wenchen Fan >Priority: Major > > {code} > class I(i: Int) { > def double: Int = i * 2 > } > class Context { > implicit def toI(i: Int): I = new I(i) > } > val c = new Context > import c._ > // OK > 1.double > // Fail > class A; 1.double > {code} > The above code snippets can work in Scala REPL however. > This will affect our Dataset functionality, for example: > {code} > class A; Seq(1 -> "a").toDS() // fail > {code} > or in paste mode: > {code} > :paste > class A > Seq(1 -> "a").toDS() // fail > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24214) StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation.toJSON should not fail
Shixiong Zhu created SPARK-24214: Summary: StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation.toJSON should not fail Key: SPARK-24214 URL: https://issues.apache.org/jira/browse/SPARK-24214 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.3.0 Reporter: Shixiong Zhu Assignee: Shixiong Zhu We should overwrite "otherCopyArgs" to provide the SparkSession parameter otherwise TreeNode.toJSON cannot get the full constructor parameter list. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24061) [SS]TypedFilter is not supported in Continuous Processing
[ https://issues.apache.org/jira/browse/SPARK-24061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reassigned SPARK-24061: Assignee: Wang Yanlin > [SS]TypedFilter is not supported in Continuous Processing > - > > Key: SPARK-24061 > URL: https://issues.apache.org/jira/browse/SPARK-24061 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wang Yanlin >Assignee: Wang Yanlin >Priority: Major > Fix For: 2.4.0 > > Attachments: TypedFilter_error.png > > > using filter with filter function in continuous processing application ,cause > error -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24061) [SS]TypedFilter is not supported in Continuous Processing
[ https://issues.apache.org/jira/browse/SPARK-24061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-24061. -- Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 21136 [https://github.com/apache/spark/pull/21136] > [SS]TypedFilter is not supported in Continuous Processing > - > > Key: SPARK-24061 > URL: https://issues.apache.org/jira/browse/SPARK-24061 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wang Yanlin >Priority: Major > Fix For: 2.4.0 > > Attachments: TypedFilter_error.png > > > using filter with filter function in continuous processing application ,cause > error -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23565) Improved error message for when the number of sources for a query changes
[ https://issues.apache.org/jira/browse/SPARK-23565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reassigned SPARK-23565: Assignee: Patrick McGloin > Improved error message for when the number of sources for a query changes > - > > Key: SPARK-23565 > URL: https://issues.apache.org/jira/browse/SPARK-23565 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Patrick McGloin >Assignee: Patrick McGloin >Priority: Minor > Fix For: 2.4.0 > > > If you change the number of sources for a Structured Streaming query then you > will get an assertion error as the number of sources in the checkpoint does > not match the number of sources in the query that is starting. This can > happen if, for example, you add a union to the input of the query. This is > of course correct but the error is a bit cryptic and requires investigation. > Suggestion for a more informative error message => > The number of sources for this query has changed. There are [x] sources in > the checkpoint offsets and now there are [y] sources requested by the query. > Cannot continue. > This is the current message. > 02-03-2018 13:14:22 ERROR StreamExecution:91 - Query ORPositionsState to > Kafka [id = 35f71e63-dbd0-49e9-98b2-a4c72a7da80e, runId = > d4439aca-549c-4ef6-872e-29fbfde1df78] terminated with error > java.lang.AssertionError: assertion failed at > scala.Predef$.assert(Predef.scala:156) at > org.apache.spark.sql.execution.streaming.OffsetSeq.toStreamProgress(OffsetSeq.scala:38) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:429) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23565) Improved error message for when the number of sources for a query changes
[ https://issues.apache.org/jira/browse/SPARK-23565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-23565. -- Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 20946 [https://github.com/apache/spark/pull/20946] > Improved error message for when the number of sources for a query changes > - > > Key: SPARK-23565 > URL: https://issues.apache.org/jira/browse/SPARK-23565 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Patrick McGloin >Priority: Minor > Fix For: 2.4.0 > > > If you change the number of sources for a Structured Streaming query then you > will get an assertion error as the number of sources in the checkpoint does > not match the number of sources in the query that is starting. This can > happen if, for example, you add a union to the input of the query. This is > of course correct but the error is a bit cryptic and requires investigation. > Suggestion for a more informative error message => > The number of sources for this query has changed. There are [x] sources in > the checkpoint offsets and now there are [y] sources requested by the query. > Cannot continue. > This is the current message. > 02-03-2018 13:14:22 ERROR StreamExecution:91 - Query ORPositionsState to > Kafka [id = 35f71e63-dbd0-49e9-98b2-a4c72a7da80e, runId = > d4439aca-549c-4ef6-872e-29fbfde1df78] terminated with error > java.lang.AssertionError: assertion failed at > scala.Predef$.assert(Predef.scala:156) at > org.apache.spark.sql.execution.streaming.OffsetSeq.toStreamProgress(OffsetSeq.scala:38) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:429) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23860) SQLAppStatusListener should handle the cases that an accumulator may be GCed
[ https://issues.apache.org/jira/browse/SPARK-23860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-23860. -- Resolution: Duplicate already fixed in SPARK-20652 > SQLAppStatusListener should handle the cases that an accumulator may be GCed > - > > Key: SPARK-23860 > URL: https://issues.apache.org/jira/browse/SPARK-23860 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Shixiong Zhu >Priority: Major > > "SQLAppStatusListener.onTaskEnd" is running in the event thread. When it's > called, the Spark job may already finished, and accumulators may be GCed. > "SQLAppStatusListener.onTaskEnd" should handle this case. > Here is an example of this failure (SQLAppStatusListener was called > SQLListener in 2.2): > {code} > 18/03/30 06:49:58 ERROR LiveListenerBus: Listener SQLListener threw an > exception > java.lang.IllegalStateException: Attempted to access garbage collected > accumulator 78705157 > at > org.apache.spark.util.AccumulatorContext$$anonfun$get$1.apply(AccumulatorV2.scala:268) > at > org.apache.spark.util.AccumulatorContext$$anonfun$get$1.apply(AccumulatorV2.scala:264) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.util.AccumulatorContext$.get(AccumulatorV2.scala:264) > at > org.apache.spark.util.AccumulatorV2$$anonfun$name$1.apply(AccumulatorV2.scala:90) > at > org.apache.spark.util.AccumulatorV2$$anonfun$name$1.apply(AccumulatorV2.scala:90) > at scala.Option.orElse(Option.scala:289) > at org.apache.spark.util.AccumulatorV2.name(AccumulatorV2.scala:90) > at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:111) > at > org.apache.spark.sql.execution.ui.SQLListener$$anonfun$onTaskEnd$1.apply(SQLListener.scala:227) > at > org.apache.spark.sql.execution.ui.SQLListener$$anonfun$onTaskEnd$1.apply(SQLListener.scala:227) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.execution.ui.SQLListener.onTaskEnd(SQLListener.scala:227) > at > org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45) > at > org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:42) > at > org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:42) > at > org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:84) > at > org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:42) > at > org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:100) > at > org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:81) > at > org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:81) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:81) > at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1304) > at > org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:80) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23860) SQLAppStatusListener should handle the cases that an accumulator may be GCed
Shixiong Zhu created SPARK-23860: Summary: SQLAppStatusListener should handle the cases that an accumulator may be GCed Key: SPARK-23860 URL: https://issues.apache.org/jira/browse/SPARK-23860 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0 Reporter: Shixiong Zhu "SQLAppStatusListener.onTaskEnd" is running in the event thread. When it's called, the Spark job may already finished, and accumulators may be GCed. "SQLAppStatusListener.onTaskEnd" should handle this case. Here is an example of this failure (SQLAppStatusListener was called SQLListener in 2.2): {code} 18/03/30 06:49:58 ERROR LiveListenerBus: Listener SQLListener threw an exception java.lang.IllegalStateException: Attempted to access garbage collected accumulator 78705157 at org.apache.spark.util.AccumulatorContext$$anonfun$get$1.apply(AccumulatorV2.scala:268) at org.apache.spark.util.AccumulatorContext$$anonfun$get$1.apply(AccumulatorV2.scala:264) at scala.Option.map(Option.scala:146) at org.apache.spark.util.AccumulatorContext$.get(AccumulatorV2.scala:264) at org.apache.spark.util.AccumulatorV2$$anonfun$name$1.apply(AccumulatorV2.scala:90) at org.apache.spark.util.AccumulatorV2$$anonfun$name$1.apply(AccumulatorV2.scala:90) at scala.Option.orElse(Option.scala:289) at org.apache.spark.util.AccumulatorV2.name(AccumulatorV2.scala:90) at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:111) at org.apache.spark.sql.execution.ui.SQLListener$$anonfun$onTaskEnd$1.apply(SQLListener.scala:227) at org.apache.spark.sql.execution.ui.SQLListener$$anonfun$onTaskEnd$1.apply(SQLListener.scala:227) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.ui.SQLListener.onTaskEnd(SQLListener.scala:227) at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45) at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:42) at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:42) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:84) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:42) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:100) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:81) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:81) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:81) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1304) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:80) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23848) Structured Streaming fails with nested UDTs
[ https://issues.apache.org/jira/browse/SPARK-23848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423071#comment-16423071 ] Shixiong Zhu commented on SPARK-23848: -- To fix your codes, you can just change "case Row(a: Int, b: Array[_]) =>" to "case Row(a: Int, b: Seq[_]) =>". > Structured Streaming fails with nested UDTs > --- > > Key: SPARK-23848 > URL: https://issues.apache.org/jira/browse/SPARK-23848 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Joseph K. Bradley >Priority: Major > > While trying to write a test for org.apache.spark.ml.feature.MinHashLSHModel > with Structured Streaming (for prediction in a streaming job), I ran into a > bug which seems to indicate that nested UDTs don't work with streaming. > Here's a simplified version of the code: > {code} > package org.apache.spark.ml.feature > import org.apache.spark.ml.linalg.{Vector, Vectors} > import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row} > import org.apache.spark.sql.execution.streaming.MemoryStream > import org.apache.spark.sql.streaming.StreamTest > class MinHashLSHSuite extends StreamTest { > @transient var dataset: Dataset[_] = _ > override def beforeAll(): Unit = { > super.beforeAll() > val data = { > for (i <- 0 to 95) yield Vectors.sparse(100, (i until i + 5).map((_, > 1.0))) > } > dataset = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys") > } > test("a test") { > val localSpark = spark > import localSpark.implicits._ > val df = Seq[(Int, Array[Vector])]( > (1, Array(Vectors.dense(1.0, 2.0))), > (2, Array(Vectors.dense(1.1, 2.1))) > ).toDF("a", "b") > df.show() // THIS SUCCEEDS > df.collect().foreach(println) // THIS SUCCEEDS > testTransformerOnStreamData[(Int, Array[Vector])](df) { rows => // THIS > FAILS > rows.foreach { > case Row(a: Int, b: Array[_]) => > } > } > } > def testTransformerOnStreamData[A : Encoder]( > dataframe: DataFrame) > (globalCheckFunction: Seq[Row] => Unit): Unit = { > val stream = MemoryStream[A] > val streamDF = stream.toDS().toDF("a", "b") > val data = dataframe.as[A].collect() > val streamOutput = streamDF > .select("a", "b") > testStream(streamOutput) ( > AddData(stream, data: _*), > CheckAnswer(globalCheckFunction) > ) > } > } > {code} > The streaming test fails with stack trace: > {code} > [info] - a test *** FAILED *** (2 seconds, 325 milliseconds) > [info] scala.MatchError: [1,WrappedArray([1.0,2.0])] (of class > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) > [info] > [info] == Progress == > [info] AddData to MemoryStream[_1#24,_2#25]: > (1,[Lorg.apache.spark.ml.linalg.Vector;@5abf84a9),(2,[Lorg.apache.spark.ml.linalg.Vector;@4b4198ba) > [info] => CheckAnswerByFunc > [info] > [info] == Stream == > [info] Output Mode: Append > [info] Stream state: {MemoryStream[_1#24,_2#25]: 0} > [info] Thread state: alive > [info] Thread stack trace: java.lang.Thread.sleep(Native Method) > [info] > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163) > [info] > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > [info] > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131) > [info] > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) > [info] > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) > [info] > [info] > [info] == Sink == > [info] 0: [1,WrappedArray([1.0,2.0])] [2,WrappedArray([1.1,2.1])] > [info] > [info] > [info] == Plan == > [info] == Parsed Logical Plan == > [info] Project [a#27, b#28] > [info] +- Project [_1#24 AS a#27, _2#25 AS b#28] > [info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25] > [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37] > [info] > [info] == Analyzed Logical Plan == > [info] a: int, b: array > [info] Project [a#27, b#28] > [info] +- Project [_1#24 AS a#27, _2#25 AS b#28] > [info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25] > [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37] > [info] > [info] == Optimized Logical Plan == > [info] Project [_1#36 AS a#27, _2#37 AS b#28] > [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37] > [info] > [info] == Physical Plan == > [info] *(1) Project [_1#36 AS a#27, _2#37 AS b#28] > [info] +- *(1) ScanV2
[jira] [Commented] (SPARK-23848) Structured Streaming fails with nested UDTs
[ https://issues.apache.org/jira/browse/SPARK-23848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423070#comment-16423070 ] Shixiong Zhu commented on SPARK-23848: -- [~josephkb] this is unfortunately because DataFrame is not a type safe API. It always converts an array to Seq (WrappedArray as reported in the error message). This is the converter if you are curious: https://github.com/apache/spark/blob/7fdacbc77bbcf98c2c045a1873e749129769dcc0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L176 > Structured Streaming fails with nested UDTs > --- > > Key: SPARK-23848 > URL: https://issues.apache.org/jira/browse/SPARK-23848 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Joseph K. Bradley >Priority: Major > > While trying to write a test for org.apache.spark.ml.feature.MinHashLSHModel > with Structured Streaming (for prediction in a streaming job), I ran into a > bug which seems to indicate that nested UDTs don't work with streaming. > Here's a simplified version of the code: > {code} > package org.apache.spark.ml.feature > import org.apache.spark.ml.linalg.{Vector, Vectors} > import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row} > import org.apache.spark.sql.execution.streaming.MemoryStream > import org.apache.spark.sql.streaming.StreamTest > class MinHashLSHSuite extends StreamTest { > @transient var dataset: Dataset[_] = _ > override def beforeAll(): Unit = { > super.beforeAll() > val data = { > for (i <- 0 to 95) yield Vectors.sparse(100, (i until i + 5).map((_, > 1.0))) > } > dataset = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys") > } > test("a test") { > val localSpark = spark > import localSpark.implicits._ > val df = Seq[(Int, Array[Vector])]( > (1, Array(Vectors.dense(1.0, 2.0))), > (2, Array(Vectors.dense(1.1, 2.1))) > ).toDF("a", "b") > df.show() // THIS SUCCEEDS > df.collect().foreach(println) // THIS SUCCEEDS > testTransformerOnStreamData[(Int, Array[Vector])](df) { rows => // THIS > FAILS > rows.foreach { > case Row(a: Int, b: Array[_]) => > } > } > } > def testTransformerOnStreamData[A : Encoder]( > dataframe: DataFrame) > (globalCheckFunction: Seq[Row] => Unit): Unit = { > val stream = MemoryStream[A] > val streamDF = stream.toDS().toDF("a", "b") > val data = dataframe.as[A].collect() > val streamOutput = streamDF > .select("a", "b") > testStream(streamOutput) ( > AddData(stream, data: _*), > CheckAnswer(globalCheckFunction) > ) > } > } > {code} > The streaming test fails with stack trace: > {code} > [info] - a test *** FAILED *** (2 seconds, 325 milliseconds) > [info] scala.MatchError: [1,WrappedArray([1.0,2.0])] (of class > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) > [info] > [info] == Progress == > [info] AddData to MemoryStream[_1#24,_2#25]: > (1,[Lorg.apache.spark.ml.linalg.Vector;@5abf84a9),(2,[Lorg.apache.spark.ml.linalg.Vector;@4b4198ba) > [info] => CheckAnswerByFunc > [info] > [info] == Stream == > [info] Output Mode: Append > [info] Stream state: {MemoryStream[_1#24,_2#25]: 0} > [info] Thread state: alive > [info] Thread stack trace: java.lang.Thread.sleep(Native Method) > [info] > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163) > [info] > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > [info] > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131) > [info] > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) > [info] > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) > [info] > [info] > [info] == Sink == > [info] 0: [1,WrappedArray([1.0,2.0])] [2,WrappedArray([1.1,2.1])] > [info] > [info] > [info] == Plan == > [info] == Parsed Logical Plan == > [info] Project [a#27, b#28] > [info] +- Project [_1#24 AS a#27, _2#25 AS b#28] > [info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25] > [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37] > [info] > [info] == Analyzed Logical Plan == > [info] a: int, b: array > [info] Project [a#27, b#28] > [info] +- Project [_1#24 AS a#27, _2#25 AS b#28] > [info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25] > [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37] > [info] > [info] == Optimized Logical Plan