[jira] [Created] (SPARK-47434) Streaming Statistics link redirect causing 302 error
Huw created SPARK-47434: --- Summary: Streaming Statistics link redirect causing 302 error Key: SPARK-47434 URL: https://issues.apache.org/jira/browse/SPARK-47434 Project: Spark Issue Type: Bug Components: Web UI Affects Versions: 3.5.1, 3.4.1 Reporter: Huw Fix For: 3.5.2 When using a reverse proxy, links to streaming statistics page are missing a trailing slash, which causes a redirect (302) to an incorrect path. Essentially the same issue as https://issues.apache.org/jira/browse/SPARK-24553 but for a different link. .../StreamingQuery/statistics?id=abcd -> .../StreamingQuery/statistics/?id=abcd Linked PR [https://github.com/apache/spark/pull/45527/files] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45583) Spark SQL returning incorrect values for full outer join on keys with the same name.
[ https://issues.apache.org/jira/browse/SPARK-45583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1502#comment-1502 ] Huw commented on SPARK-45583: - Ahh, apologies, it looks like I was running 3.4.1 when I found this issue. Testing in 3.5 it does appear to be resolved. > Spark SQL returning incorrect values for full outer join on keys with the > same name. > > > Key: SPARK-45583 > URL: https://issues.apache.org/jira/browse/SPARK-45583 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1 >Reporter: Huw >Priority: Major > Fix For: 3.5.0 > > > {{The following query gives the wrong results.}} > > {{WITH people as (}} > {{ SELECT * FROM (VALUES }} > {{ (1, 'Peter'), }} > {{ (2, 'Homer'), }} > {{ (3, 'Ned'),}} > {{ (3, 'Jenny')}} > {{ ) AS Idiots(id, FirstName)}} > {{{}){}}}{{{}, location as ({}}} > {{ SELECT * FROM (VALUES}} > {{ (1, 'sample0'),}} > {{ (1, 'sample1'),}} > {{ (2, 'sample2') }} > {{ ) as Locations(id, address)}} > {{{}){}}}{{{}SELECT{}}} > {{ *}} > {{FROM}} > {{ people}} > {{FULL OUTER JOIN}} > {{ location}} > {{ON}} > {{ people.id = location.id}} > {{We find the following table:}} > ||id: integer||FirstName: string||id: integer||address: string|| > |2|Homer|2|sample2| > |null|Ned|null|null| > |null|Jenny|null|null| > |1|Peter|1|sample0| > |1|Peter|1|sample1| > {{But clearly the first `id` column is wrong, the nulls should be 3.}} > If we rename the id column in (only) the person table to pid we get the > correct results: > ||pid: integer||FirstName: string||id: integer||address: string|| > |2|Homer|2|sample2| > |3|Ned|null|null| > |3|Jenny|null|null| > |1|Peter|1|sample0| > |1|Peter|1|sample1| -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45583) Spark SQL returning incorrect values for full outer join on keys with the same name.
[ https://issues.apache.org/jira/browse/SPARK-45583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huw updated SPARK-45583: Affects Version/s: 3.4.1 (was: 3.5.0) > Spark SQL returning incorrect values for full outer join on keys with the > same name. > > > Key: SPARK-45583 > URL: https://issues.apache.org/jira/browse/SPARK-45583 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1 >Reporter: Huw >Priority: Major > > {{The following query gives the wrong results.}} > > {{WITH people as (}} > {{ SELECT * FROM (VALUES }} > {{ (1, 'Peter'), }} > {{ (2, 'Homer'), }} > {{ (3, 'Ned'),}} > {{ (3, 'Jenny')}} > {{ ) AS Idiots(id, FirstName)}} > {{{}){}}}{{{}, location as ({}}} > {{ SELECT * FROM (VALUES}} > {{ (1, 'sample0'),}} > {{ (1, 'sample1'),}} > {{ (2, 'sample2') }} > {{ ) as Locations(id, address)}} > {{{}){}}}{{{}SELECT{}}} > {{ *}} > {{FROM}} > {{ people}} > {{FULL OUTER JOIN}} > {{ location}} > {{ON}} > {{ people.id = location.id}} > {{We find the following table:}} > ||id: integer||FirstName: string||id: integer||address: string|| > |2|Homer|2|sample2| > |null|Ned|null|null| > |null|Jenny|null|null| > |1|Peter|1|sample0| > |1|Peter|1|sample1| > {{But clearly the first `id` column is wrong, the nulls should be 3.}} > If we rename the id column in (only) the person table to pid we get the > correct results: > ||pid: integer||FirstName: string||id: integer||address: string|| > |2|Homer|2|sample2| > |3|Ned|null|null| > |3|Jenny|null|null| > |1|Peter|1|sample0| > |1|Peter|1|sample1| -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45583) Spark SQL returning incorrect values for full outer join on keys with the same name.
[ https://issues.apache.org/jira/browse/SPARK-45583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huw updated SPARK-45583: Fix Version/s: 3.5.0 > Spark SQL returning incorrect values for full outer join on keys with the > same name. > > > Key: SPARK-45583 > URL: https://issues.apache.org/jira/browse/SPARK-45583 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1 >Reporter: Huw >Priority: Major > Fix For: 3.5.0 > > > {{The following query gives the wrong results.}} > > {{WITH people as (}} > {{ SELECT * FROM (VALUES }} > {{ (1, 'Peter'), }} > {{ (2, 'Homer'), }} > {{ (3, 'Ned'),}} > {{ (3, 'Jenny')}} > {{ ) AS Idiots(id, FirstName)}} > {{{}){}}}{{{}, location as ({}}} > {{ SELECT * FROM (VALUES}} > {{ (1, 'sample0'),}} > {{ (1, 'sample1'),}} > {{ (2, 'sample2') }} > {{ ) as Locations(id, address)}} > {{{}){}}}{{{}SELECT{}}} > {{ *}} > {{FROM}} > {{ people}} > {{FULL OUTER JOIN}} > {{ location}} > {{ON}} > {{ people.id = location.id}} > {{We find the following table:}} > ||id: integer||FirstName: string||id: integer||address: string|| > |2|Homer|2|sample2| > |null|Ned|null|null| > |null|Jenny|null|null| > |1|Peter|1|sample0| > |1|Peter|1|sample1| > {{But clearly the first `id` column is wrong, the nulls should be 3.}} > If we rename the id column in (only) the person table to pid we get the > correct results: > ||pid: integer||FirstName: string||id: integer||address: string|| > |2|Homer|2|sample2| > |3|Ned|null|null| > |3|Jenny|null|null| > |1|Peter|1|sample0| > |1|Peter|1|sample1| -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45583) Spark SQL returning incorrect values for full outer join on keys with the same name.
[ https://issues.apache.org/jira/browse/SPARK-45583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huw updated SPARK-45583: Description: {{The following query gives the wrong results.}} {{WITH people as (}} {{ SELECT * FROM (VALUES }} {{ (1, 'Peter'), }} {{ (2, 'Homer'), }} {{ (3, 'Ned'),}} {{ (3, 'Jenny')}} {{ ) AS Idiots(id, FirstName)}} {{{}){}}}{{{}, location as ({}}} {{ SELECT * FROM (VALUES}} {{ (1, 'sample0'),}} {{ (1, 'sample1'),}} {{ (2, 'sample2') }} {{ ) as Locations(id, address)}} {{{}){}}}{{{}SELECT{}}} {{ *}} {{FROM}} {{ people}} {{FULL OUTER JOIN}} {{ location}} {{ON}} {{ people.id = location.id}} {{We find the following table:}} ||id: integer||FirstName: string||id: integer||address: string|| |2|Homer|2|sample2| |null|Ned|null|null| |null|Jenny|null|null| |1|Peter|1|sample0| |1|Peter|1|sample1| {{But clearly the first `id` column is wrong, the nulls should be 3.}} If we rename the id column in (only) the person table to pid we get the correct results: ||pid: integer||FirstName: string||id: integer||address: string|| |2|Homer|2|sample2| |3|Ned|null|null| |3|Jenny|null|null| |1|Peter|1|sample0| |1|Peter|1|sample1| was: {{The following query gives the wrong results.}} {{WITH people as (}} {{ SELECT * FROM (VALUES }} {{ (1, 'Peter'), }} {{ (2, 'Homer'), }} {{ (3, 'Ned'),}} {{ (3, 'Jenny')}} {{ ) AS Idiots(id, FirstName)}} {{{}){}}}{{{}, location as ({}}} {{ SELECT * FROM (VALUES}} {{ (1, 'sample0'),}} {{ (1, 'sample1'),}} {{ (2, 'sample2') }} {{ ) as Locations(id, address)}} {{{}){}}}{{{}SELECT{}}} {{ *}} {{FROM}} {{ people}} {{FULL OUTER JOIN}} {{ location}} {{ON}} {{ people.id = location.id}} {{We find the following table:}} ||id: integer||FirstName: string||id: integer||address: string|| |2|Homer|2|sample2| |null|Ned|null|null| |null|Jenny|null|null| |1|Peter|1|sample0| |1|Peter|1|sample1| {{But clearly the first `id` table is wrong, the nulls should be 3.}} If we rename the id column in (only) the person table to pid we get the correct results: ||pid: integer||FirstName: string||id: integer||address: string|| |2|Homer|2|sample2| |3|Ned|null|null| |3|Jenny|null|null| |1|Peter|1|sample0| |1|Peter|1|sample1| > Spark SQL returning incorrect values for full outer join on keys with the > same name. > > > Key: SPARK-45583 > URL: https://issues.apache.org/jira/browse/SPARK-45583 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Huw >Priority: Major > > {{The following query gives the wrong results.}} > > {{WITH people as (}} > {{ SELECT * FROM (VALUES }} > {{ (1, 'Peter'), }} > {{ (2, 'Homer'), }} > {{ (3, 'Ned'),}} > {{ (3, 'Jenny')}} > {{ ) AS Idiots(id, FirstName)}} > {{{}){}}}{{{}, location as ({}}} > {{ SELECT * FROM (VALUES}} > {{ (1, 'sample0'),}} > {{ (1, 'sample1'),}} > {{ (2, 'sample2') }} > {{ ) as Locations(id, address)}} > {{{}){}}}{{{}SELECT{}}} > {{ *}} > {{FROM}} > {{ people}} > {{FULL OUTER JOIN}} > {{ location}} > {{ON}} > {{ people.id = location.id}} > {{We find the following table:}} > ||id: integer||FirstName: string||id: integer||address: string|| > |2|Homer|2|sample2| > |null|Ned|null|null| > |null|Jenny|null|null| > |1|Peter|1|sample0| > |1|Peter|1|sample1| > {{But clearly the first `id` column is wrong, the nulls should be 3.}} > If we rename the id column in (only) the person table to pid we get the > correct results: > ||pid: integer||FirstName: string||id: integer||address: string|| > |2|Homer|2|sample2| > |3|Ned|null|null| > |3|Jenny|null|null| > |1|Peter|1|sample0| > |1|Peter|1|sample1| -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45583) Spark SQL returning incorrect values for full outer join on keys with the same name.
Huw created SPARK-45583: --- Summary: Spark SQL returning incorrect values for full outer join on keys with the same name. Key: SPARK-45583 URL: https://issues.apache.org/jira/browse/SPARK-45583 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.0 Reporter: Huw {{The following query gives the wrong results.}} {{WITH people as (}} {{ SELECT * FROM (VALUES }} {{ (1, 'Peter'), }} {{ (2, 'Homer'), }} {{ (3, 'Ned'),}} {{ (3, 'Jenny')}} {{ ) AS Idiots(id, FirstName)}} {{{}){}}}{{{}, location as ({}}} {{ SELECT * FROM (VALUES}} {{ (1, 'sample0'),}} {{ (1, 'sample1'),}} {{ (2, 'sample2') }} {{ ) as Locations(id, address)}} {{{}){}}}{{{}SELECT{}}} {{ *}} {{FROM}} {{ people}} {{FULL OUTER JOIN}} {{ location}} {{ON}} {{ people.id = location.id}} {{We find the following table:}} {{}} ||idinteger||FirstNamestring||idinteger||addressstring|| |2|Homer|2|sample2| |null|Ned|null|null| |null|Jenny|null|null| |1|Peter|1|sample0| |1|Peter|1|sample1| {{But clearly the first `id` table is wrong, the nulls should be 3.}} If we rename the id column in (only) the person table to pid we get the correct results: ||pidinteger||FirstNamestring||idinteger||addressstring|| |2|Homer|2|sample2| |3|Ned|null|null| |3|Jenny|null|null| |1|Peter|1|sample0| |1|Peter|1|sample1| -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45583) Spark SQL returning incorrect values for full outer join on keys with the same name.
[ https://issues.apache.org/jira/browse/SPARK-45583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huw updated SPARK-45583: Description: {{The following query gives the wrong results.}} {{WITH people as (}} {{ SELECT * FROM (VALUES }} {{ (1, 'Peter'), }} {{ (2, 'Homer'), }} {{ (3, 'Ned'),}} {{ (3, 'Jenny')}} {{ ) AS Idiots(id, FirstName)}} {{{}){}}}{{{}, location as ({}}} {{ SELECT * FROM (VALUES}} {{ (1, 'sample0'),}} {{ (1, 'sample1'),}} {{ (2, 'sample2') }} {{ ) as Locations(id, address)}} {{{}){}}}{{{}SELECT{}}} {{ *}} {{FROM}} {{ people}} {{FULL OUTER JOIN}} {{ location}} {{ON}} {{ people.id = location.id}} {{We find the following table:}} ||id: integer||FirstName: string||id: integer||address: string|| |2|Homer|2|sample2| |null|Ned|null|null| |null|Jenny|null|null| |1|Peter|1|sample0| |1|Peter|1|sample1| {{But clearly the first `id` table is wrong, the nulls should be 3.}} If we rename the id column in (only) the person table to pid we get the correct results: ||pid: integer||FirstName: string||id: integer||address: string|| |2|Homer|2|sample2| |3|Ned|null|null| |3|Jenny|null|null| |1|Peter|1|sample0| |1|Peter|1|sample1| was: {{The following query gives the wrong results.}} {{WITH people as (}} {{ SELECT * FROM (VALUES }} {{ (1, 'Peter'), }} {{ (2, 'Homer'), }} {{ (3, 'Ned'),}} {{ (3, 'Jenny')}} {{ ) AS Idiots(id, FirstName)}} {{{}){}}}{{{}, location as ({}}} {{ SELECT * FROM (VALUES}} {{ (1, 'sample0'),}} {{ (1, 'sample1'),}} {{ (2, 'sample2') }} {{ ) as Locations(id, address)}} {{{}){}}}{{{}SELECT{}}} {{ *}} {{FROM}} {{ people}} {{FULL OUTER JOIN}} {{ location}} {{ON}} {{ people.id = location.id}} {{We find the following table:}} {{}} ||idinteger||FirstNamestring||idinteger||addressstring|| |2|Homer|2|sample2| |null|Ned|null|null| |null|Jenny|null|null| |1|Peter|1|sample0| |1|Peter|1|sample1| {{But clearly the first `id` table is wrong, the nulls should be 3.}} If we rename the id column in (only) the person table to pid we get the correct results: ||pidinteger||FirstNamestring||idinteger||addressstring|| |2|Homer|2|sample2| |3|Ned|null|null| |3|Jenny|null|null| |1|Peter|1|sample0| |1|Peter|1|sample1| > Spark SQL returning incorrect values for full outer join on keys with the > same name. > > > Key: SPARK-45583 > URL: https://issues.apache.org/jira/browse/SPARK-45583 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Huw >Priority: Major > > {{The following query gives the wrong results.}} > > {{WITH people as (}} > {{ SELECT * FROM (VALUES }} > {{ (1, 'Peter'), }} > {{ (2, 'Homer'), }} > {{ (3, 'Ned'),}} > {{ (3, 'Jenny')}} > {{ ) AS Idiots(id, FirstName)}} > {{{}){}}}{{{}, location as ({}}} > {{ SELECT * FROM (VALUES}} > {{ (1, 'sample0'),}} > {{ (1, 'sample1'),}} > {{ (2, 'sample2') }} > {{ ) as Locations(id, address)}} > {{{}){}}}{{{}SELECT{}}} > {{ *}} > {{FROM}} > {{ people}} > {{FULL OUTER JOIN}} > {{ location}} > {{ON}} > {{ people.id = location.id}} > {{We find the following table:}} > ||id: integer||FirstName: string||id: integer||address: string|| > |2|Homer|2|sample2| > |null|Ned|null|null| > |null|Jenny|null|null| > |1|Peter|1|sample0| > |1|Peter|1|sample1| > {{But clearly the first `id` table is wrong, the nulls should be 3.}} > If we rename the id column in (only) the person table to pid we get the > correct results: > ||pid: integer||FirstName: string||id: integer||address: string|| > |2|Homer|2|sample2| > |3|Ned|null|null| > |3|Jenny|null|null| > |1|Peter|1|sample0| > |1|Peter|1|sample1| -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45176) AggregatingAccumulator with TypedImperativeAggregate throwing ClassCastException
[ https://issues.apache.org/jira/browse/SPARK-45176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huw updated SPARK-45176: Description: Probably related to SPARK-39044. But potentially also this comment in Executor.scala. {quote}// TODO: do not serialize value twice val directResult = new DirectTaskResult(valueByteBuffer, accumUpdates, metricPeaks) {quote} The class cast exception I'm seeing is {quote} java.lang.ClassCastException: class [B cannot be cast to class org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir {quote} But I've seen it with other aggregation buffers like QuantileSummaries as well. It's my belief that withBufferSerialized() for the Aggregating Accumulator is being called twice, leading to on serializeAggregateBuffernPlace(buffer) also being called twice for the an Imperative aggregate, the second time round, the buffer is already a byte array and the asInstanceOf[T] in getBufferObject is throwing. This doesn't appear to happen on all runs, and it might be its only occurring when there's a transitive exception. I have a further suspicion that the cause might originate with {quote} SerializationDebugger.improveException {quote} which is traversing the task and forcing writeExternal, to be called. Setting |spark.serializer.extraDebugInfo|false| Seems to make things a bit more reliable (I haven't seen the error while this setting is on), and points strongly in that direction. Stack trace: {quote} Job aborted due to stage failure: Authorized committer (attemptNumber=0, stage=15, partition=10) failed; but task commit success, data duplication may happen. reason=ExceptionFailure(java.io.IOException,java.lang.ClassCastException: class [B cannot be cast to class org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in module java.base of loader 'bootstrap'; org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed module of loader 'app'),[Ljava.lang.StackTraceElement;@7fe2f462,java.io.IOException: java.lang.ClassCastException: class [B cannot be cast to class org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in module java.base of loader 'bootstrap'; org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed module of loader 'app') at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1502) at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:59) at java.base/java.io.ObjectOutputStream.writeExternalData(Unknown Source) at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source) at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source) at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.SerializerHelper$.serializeToChunkedBuffer(SerializerHelper.scala:42) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:643) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.ClassCastException: class [B cannot be cast to class org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in module java.base of loader 'bootstrap'; org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed module of loader 'app') at org.apache.spark.sql.catalyst.expressions.aggregate.ReservoirSample.serialize(ReservoirSample.scala:33) at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624) at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:206) at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33) at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186) at jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at java.base/java.io.ObjectStreamClass.invokeWriteReplace(Unknown Source) at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source) at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source) at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:62) at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:62) at scala.collection.immutable.Vector.foreach(Vector.scala:1856) at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:62) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at
[jira] [Created] (SPARK-45176) AggregatingAccumulator with TypedImperativeAggregate throwing ClassCastException
Huw created SPARK-45176: --- Summary: AggregatingAccumulator with TypedImperativeAggregate throwing ClassCastException Key: SPARK-45176 URL: https://issues.apache.org/jira/browse/SPARK-45176 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.4.1, 3.4.0 Reporter: Huw Probably related to SPARK-39044. But potentially also this comment in Executor.scala. // TODO: do not serialize value twice val directResult = new DirectTaskResult(valueByteBuffer, accumUpdates, metricPeaks) The class cast exception I'm seeing is java.lang.ClassCastException: class [B cannot be cast to class org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir But I've seen it with other aggregation buffers like QuantileSummaries as well. It's my belief that withBufferSerialized() for the Aggregating Accumulator is being called twice, leading to on serializeAggregateBuffernPlace(buffer) also being called twice for the an Imperative aggregate, the second time round, the buffer is already a byte array and the asInstanceOf[T] in getBufferObject is throwing. This doesn't appear to happen on all runs, and it might be its only occurring when there's a transitive exception. I have a further suspicion that the cause might originate with SerializationDebugger.improveException which is traversing the task and forcing writeExternal, to be called. Setting |spark.serializer.extraDebugInfo|false| Seems to make things a bit more reliable (I haven't seen the error while this setting is on), and points strongly in that direction. Stack trace: Job aborted due to stage failure: Authorized committer (attemptNumber=0, stage=15, partition=10) failed; but task commit success, data duplication may happen. reason=ExceptionFailure(java.io.IOException,java.lang.ClassCastException: class [B cannot be cast to class org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in module java.base of loader 'bootstrap'; org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed module of loader 'app'),[Ljava.lang.StackTraceElement;@7fe2f462,java.io.IOException: java.lang.ClassCastException: class [B cannot be cast to class org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in module java.base of loader 'bootstrap'; org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed module of loader 'app') at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1502) at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:59) at java.base/java.io.ObjectOutputStream.writeExternalData(Unknown Source) at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source) at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source) at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.SerializerHelper$.serializeToChunkedBuffer(SerializerHelper.scala:42) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:643) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.ClassCastException: class [B cannot be cast to class org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in module java.base of loader 'bootstrap'; org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed module of loader 'app') at org.apache.spark.sql.catalyst.expressions.aggregate.ReservoirSample.serialize(ReservoirSample.scala:33) at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624) at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:206) at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33) at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186) at jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at java.base/java.io.ObjectStreamClass.invokeWriteReplace(Unknown Source) at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source) at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source) at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:62) at
[jira] (SPARK-37487) CollectMetrics is executed twice if it is followed by a sort
[ https://issues.apache.org/jira/browse/SPARK-37487 ] Huw deleted comment on SPARK-37487: - was (Author: JIRAUSER288917): I think I've seen crashes because of this in production. I can't reproduce locally, but I believe that Imperative aggregates are having their `serialiseAggregateBufferInPlace` function called twice, the second time it's doing an unsafe coerce onto a byte buffer. {quote}Caused by: java.lang.ClassCastException: class [B cannot be cast to class org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigest ([B is in module java.base of loader 'bootstrap'; org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigest is in unnamed module of loader 'app') at org.apache.spark.sql.catalyst.expressions.aggregate.ApproxQuantiles.serialize(ApproxQuantiles.scala:19) at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624) at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:206) at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33){quote} > CollectMetrics is executed twice if it is followed by a sort > > > Key: SPARK-37487 > URL: https://issues.apache.org/jira/browse/SPARK-37487 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > Labels: correctness > > It is best examplified by this new UT in DataFrameCallbackSuite: > {code} > test("SPARK-37487: get observable metrics with sort by callback") { > val df = spark.range(100) > .observe( > name = "my_event", > min($"id").as("min_val"), > max($"id").as("max_val"), > // Test unresolved alias > sum($"id"), > count(when($"id" % 2 === 0, 1)).as("num_even")) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .sort($"id".desc) > validateObservedMetrics(df) > } > {code} > The count and sum aggregate report twice the number of rows: > {code} > [info] - SPARK-37487: get observable metrics with sort by callback *** FAILED > *** (169 milliseconds) > [info] [0,99,9900,100] did not equal [0,99,4950,50] > (DataFrameCallbackSuite.scala:342) > [info] org.scalatest.exceptions.TestFailedException: > [info] at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) > [info] at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) > [info] at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) > [info] at > org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) > [info] at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) > [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > [info] at org.scalatest.Transformer.apply(Transformer.scala:20) > {code} > I could not figure out how this happes. Hopefully the UT can help with > debugging -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45099) SortMergeExec with Outer using join forgets sort information
[ https://issues.apache.org/jira/browse/SPARK-45099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huw updated SPARK-45099: Description: When performing a 'using' join with a sort hint in a full outer, the ResolveNaturalAndUsingJoin will kick in and build a new join with Equality conditions and a Projection like this: {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } {quote} There's nothing wrong with this per se, but, SortMergeJoinExec has it's output ordering for a full outer join as empty, even though these join pairs in their final coalesced form actually are ordered. This means that code like this: {quote}frames.reduceLeft(case (l, r) => l.join(r.hint("merge"), usingColumns = Seq("a", "b"), joinType = "outer")) {quote} Given a non empty list of frames, will not 'stream' without a shuffle step, as each join forgets its sort order. Ideally this whole operation wouldn't require any shuffles if all the frames are grouped and sorted by the keys. (Forgive the parens instead of brackets the code snippet please, Jira was inferring macros) was: When performing a 'using' join with a sort hint in a full outer, the ResolveNaturalAndUsingJoin will kick in and build a new join with Equality conditions and a Projection like this: {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } {quote} There's nothing wrong with this per se, but, SortMergeJoinExec has it's output ordering for a full outer join as empty, even though these join pairs in their final coalesced form actually are ordered. This means that code like this: {quote}frames.reduceLeft(case (l, r) => l.join(r.hint("merge"), usingColumns = Seq("a", "b"), joinType = "outer")) {quote} Given a non empty list of frames, will not 'stream' without a shuffle step, as each join forgets its sort order. Ideally this whole operation wouldn't require any shuffles if all the frames are grouped and sorted by the keys. > SortMergeExec with Outer using join forgets sort information > > > Key: SPARK-45099 > URL: https://issues.apache.org/jira/browse/SPARK-45099 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.1 >Reporter: Huw >Priority: Minor > > When performing a 'using' join with a sort hint in a full outer, the > ResolveNaturalAndUsingJoin will kick in and build a new join with Equality > conditions and a Projection like this: > {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, > r)), l.name)() } > {quote} > There's nothing wrong with this per se, but, SortMergeJoinExec has it's > output ordering for a full outer join as empty, even though these join pairs > in their final coalesced form actually are ordered. > This means that code like this: > {quote}frames.reduceLeft(case (l, r) => l.join(r.hint("merge"), usingColumns > = Seq("a", "b"), joinType = "outer")) > {quote} > Given a non empty list of frames, will not 'stream' without a shuffle step, > as each join forgets its sort order. > Ideally this whole operation wouldn't require any shuffles if all the frames > are grouped and sorted by the keys. > (Forgive the parens instead of brackets the code snippet please, Jira was > inferring macros) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45099) SortMergeExec with Outer using join forgets sort information
[ https://issues.apache.org/jira/browse/SPARK-45099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huw updated SPARK-45099: Description: When performing a 'using' join with a sort hint in a full outer, the ResolveNaturalAndUsingJoin will kick in and build a new join with Equality conditions and a Projection like this: {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } {quote} There's nothing wrong with this per se, but, SortMergeJoinExec has it's output ordering for a full outer join as empty, even though these join pairs in their final coalesced form actually are ordered. This means that code like this: {quote}frames.reduceLeft { case (l, r) => l.join(r.hint("merge"), usingColumns = Seq("a", "b"), joinType = "outer") }{{{}{}}}{quote} Given a non empty list of frames, will not 'stream' without a shuffle step, as each join forgets its sort order. Ideally this whole operation wouldn't require any shuffles if all the frames are grouped and sorted by the keys. was: When performing a 'using' join with a sort hint in a full outer, the ResolveNaturalAndUsingJoin will kick in and build a new join with Equality conditions and a Projection like this: {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } {quote} There's nothing wrong with this per se, but, SortMergeJoinExec has it's output ordering for a full outer join as empty, even though these join pairs in their final coalesced form actually are ordered. This means that code like this: {quote}{{frames.reduceLeft }}{{{}{ case (l, r) => l.join(r.hint("merge"), usingColumns = Seq("a", "b"), joinType = "outer") }{}}}{{{}{}}}{quote} Given a non empty list of frames, will not 'stream' without a shuffle step, as each join forgets its sort order. Ideally this whole operation wouldn't require any shuffles if all the frames are grouped and sorted by the keys. > SortMergeExec with Outer using join forgets sort information > > > Key: SPARK-45099 > URL: https://issues.apache.org/jira/browse/SPARK-45099 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.1 >Reporter: Huw >Priority: Minor > > When performing a 'using' join with a sort hint in a full outer, the > ResolveNaturalAndUsingJoin will kick in and build a new join with Equality > conditions and a Projection like this: > {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, > r)), l.name)() } > {quote} > There's nothing wrong with this per se, but, SortMergeJoinExec has it's > output ordering for a full outer join as empty, even though these join pairs > in their final coalesced form actually are ordered. > This means that code like this: > {quote}frames.reduceLeft { > case (l, r) => l.join(r.hint("merge"), usingColumns = Seq("a", "b"), > joinType = "outer") > }{{{}{}}}{quote} > Given a non empty list of frames, will not 'stream' without a shuffle step, > as each join forgets its sort order. > Ideally this whole operation wouldn't require any shuffles if all the frames > are grouped and sorted by the keys. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45099) SortMergeExec with Outer using join forgets sort information
[ https://issues.apache.org/jira/browse/SPARK-45099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huw updated SPARK-45099: Description: When performing a 'using' join with a sort hint in a full outer, the ResolveNaturalAndUsingJoin will kick in and build a new join with Equality conditions and a Projection like this: {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } {quote} There's nothing wrong with this per se, but, SortMergeJoinExec has it's output ordering for a full outer join as empty, even though these join pairs in their final coalesced form actually are ordered. This means that code like this: {quote}frames.reduceLeft(case (l, r) => l.join(r.hint("merge"), usingColumns = Seq("a", "b"), joinType = "outer")) {quote} Given a non empty list of frames, will not 'stream' without a shuffle step, as each join forgets its sort order. Ideally this whole operation wouldn't require any shuffles if all the frames are grouped and sorted by the keys. was: When performing a 'using' join with a sort hint in a full outer, the ResolveNaturalAndUsingJoin will kick in and build a new join with Equality conditions and a Projection like this: {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } {quote} There's nothing wrong with this per se, but, SortMergeJoinExec has it's output ordering for a full outer join as empty, even though these join pairs in their final coalesced form actually are ordered. This means that code like this: {quote}frames.reduceLeft { case (l, r) => l.join(r.hint("merge"), usingColumns = Seq("a", "b"), joinType = "outer") }{{{}{}}}{quote} Given a non empty list of frames, will not 'stream' without a shuffle step, as each join forgets its sort order. Ideally this whole operation wouldn't require any shuffles if all the frames are grouped and sorted by the keys. > SortMergeExec with Outer using join forgets sort information > > > Key: SPARK-45099 > URL: https://issues.apache.org/jira/browse/SPARK-45099 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.1 >Reporter: Huw >Priority: Minor > > When performing a 'using' join with a sort hint in a full outer, the > ResolveNaturalAndUsingJoin will kick in and build a new join with Equality > conditions and a Projection like this: > {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, > r)), l.name)() } > {quote} > There's nothing wrong with this per se, but, SortMergeJoinExec has it's > output ordering for a full outer join as empty, even though these join pairs > in their final coalesced form actually are ordered. > This means that code like this: > {quote}frames.reduceLeft(case (l, r) => l.join(r.hint("merge"), usingColumns > = Seq("a", "b"), joinType = "outer")) > {quote} > Given a non empty list of frames, will not 'stream' without a shuffle step, > as each join forgets its sort order. > Ideally this whole operation wouldn't require any shuffles if all the frames > are grouped and sorted by the keys. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45099) SortMergeExec with Outer using join forgets sort information
[ https://issues.apache.org/jira/browse/SPARK-45099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huw updated SPARK-45099: Description: When performing a 'using' join with a sort hint in a full outer, the ResolveNaturalAndUsingJoin will kick in and build a new join with Equality conditions and a Projection like this: {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } {quote} There's nothing wrong with this per se, but, SortMergeJoinExec has it's output ordering for a full outer join as empty, even though these join pairs in their final coalesced form actually are ordered. This means that code like this: {quote}{{frames.reduceLeft }}{{{}{ case (l, r) => l.join(r.hint("merge"), usingColumns = Seq("a", "b"), joinType = "outer") }{}}}{{{}{}}}{quote} Given a non empty list of frames, will not 'stream' without a shuffle step, as each join forgets its sort order. Ideally this whole operation wouldn't require any shuffles if all the frames are grouped and sorted by the keys. was: When performing a 'using' join with a sort hint in a full outer, the ResolveNaturalAndUsingJoin will kick in and build a new join with Equality conditions and a Projection like this: {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } {quote} There's nothing wrong with this per se, but, SortMergeJoinExec has it's output ordering for a full outer join as empty, even though these join pairs in their final coalesced form actually are ordered. This means that code like this: {quote}frames.reduceLeft({ case (l, r) => l.join(r.hint("merge"), usingColumns = Seq("a", "b"), joinType = "outer") }) {quote} Given a non empty list of frames, will not 'stream' without a shuffle step, as each join forgets its sort order. Ideally this whole operation wouldn't require any shuffles if all the frames are grouped and sorted by the keys. > SortMergeExec with Outer using join forgets sort information > > > Key: SPARK-45099 > URL: https://issues.apache.org/jira/browse/SPARK-45099 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.1 >Reporter: Huw >Priority: Minor > > When performing a 'using' join with a sort hint in a full outer, the > ResolveNaturalAndUsingJoin will kick in and build a new join with Equality > conditions and a Projection like this: > {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, > r)), l.name)() } > {quote} > There's nothing wrong with this per se, but, SortMergeJoinExec has it's > output ordering for a full outer join as empty, even though these join pairs > in their final coalesced form actually are ordered. > This means that code like this: > {quote}{{frames.reduceLeft }}{{{}{ > case (l, r) => l.join(r.hint("merge"), usingColumns = Seq("a", "b"), > joinType = "outer") > }{}}}{{{}{}}}{quote} > Given a non empty list of frames, will not 'stream' without a shuffle step, > as each join forgets its sort order. > Ideally this whole operation wouldn't require any shuffles if all the frames > are grouped and sorted by the keys. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45099) SortMergeExec with Outer using join forgets sort information
[ https://issues.apache.org/jira/browse/SPARK-45099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huw updated SPARK-45099: Description: When performing a 'using' join with a sort hint in a full outer, the ResolveNaturalAndUsingJoin will kick in and build a new join with Equality conditions and a Projection like this: {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } {quote} There's nothing wrong with this per se, but, SortMergeJoinExec has it's output ordering for a full outer join as empty, even though these join pairs in their final coalesced form actually are ordered. This means that code like this: {quote}frames.reduceLeft({ case (l, r) => l.join(r.hint("merge"), usingColumns = Seq("a", "b"), joinType = "outer") }) {quote} Given a non empty list of frames, will not 'stream' without a shuffle step, as each join forgets its sort order. Ideally this whole operation wouldn't require any shuffles if all the frames are grouped and sorted by the keys. was: When performing a 'using' join with a sort hint in a full outer, the ResolveNaturalAndUsingJoin will kick in and build a new join with Equality conditions and a Projection like this: {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } {quote} There's nothing wrong with this per se, but, SortMergeJoinExec has it's output ordering for a full outer join as empty, even though these join pairs in their final coalesced form actually are ordered. This means that code like this: {quote}frames.reduceLeft { case (l, r) => l.join(r.hint("merge"), usingColumns = Seq("a", "b"), joinType = "outer") } {quote} Given a non empty list of frames, will not 'stream' without a shuffle step, as each join forgets its sort order. Ideally this whole operation wouldn't require any shuffles if all the frames are grouped and sorted by the keys. > SortMergeExec with Outer using join forgets sort information > > > Key: SPARK-45099 > URL: https://issues.apache.org/jira/browse/SPARK-45099 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.1 >Reporter: Huw >Priority: Minor > > When performing a 'using' join with a sort hint in a full outer, the > ResolveNaturalAndUsingJoin will kick in and build a new join with Equality > conditions and a Projection like this: > {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, > r)), l.name)() } > {quote} > There's nothing wrong with this per se, but, SortMergeJoinExec has it's > output ordering for a full outer join as empty, even though these join pairs > in their final coalesced form actually are ordered. > This means that code like this: > {quote}frames.reduceLeft({ case (l, r) => l.join(r.hint("merge"), > usingColumns = Seq("a", "b"), joinType = "outer") }) > {quote} > Given a non empty list of frames, will not 'stream' without a shuffle step, > as each join forgets its sort order. > Ideally this whole operation wouldn't require any shuffles if all the frames > are grouped and sorted by the keys. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45099) SortMergeExec with Outer using join forgets sort information
[ https://issues.apache.org/jira/browse/SPARK-45099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huw updated SPARK-45099: Description: When performing a 'using' join with a sort hint in a full outer, the ResolveNaturalAndUsingJoin will kick in and build a new join with Equality conditions and a Projection like this: {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } {quote} There's nothing wrong with this per se, but, SortMergeJoinExec has it's output ordering for a full outer join as empty, even though these join pairs in their final coalesced form actually are ordered. This means that code like this: {quote}frames.reduceLeft { case (l, r) => l.join(r.hint("merge"), usingColumns = Seq("a", "b"), joinType = "outer") } {quote} Given a non empty list of frames, will not 'stream' without a shuffle step, as each join forgets its sort order. Ideally this whole operation wouldn't require any shuffles if all the frames are grouped and sorted by the keys. was: When performing a 'using' join with a sort hint in a full outer, the ResolveNaturalAndUsingJoin will kick in and build a new join with Equality conditions and a Projection like this: {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } {quote} There's nothing wrong with this per se, but, SortMergeJoinExec has it's output ordering for a full outer join as empty, even though these join pairs in their final coalesced form actually are ordered. This means that code like this: {quote}frames.reduceLeft { case (l, r) => l.join(r.hint("merge"), usingColumns = Seq("a", "b"), joinType = "outer") } {quote} Given a non empty list of frames, will not 'stream' without a shuffle step, as each join forgets its sort order. Ideally this whole operation wouldn't require any shuffles if all the frames are grouped and sorted by the keys. > SortMergeExec with Outer using join forgets sort information > > > Key: SPARK-45099 > URL: https://issues.apache.org/jira/browse/SPARK-45099 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.1 >Reporter: Huw >Priority: Minor > > When performing a 'using' join with a sort hint in a full outer, the > ResolveNaturalAndUsingJoin will kick in and build a new join with Equality > conditions and a Projection like this: > {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, > r)), l.name)() } > {quote} > There's nothing wrong with this per se, but, SortMergeJoinExec has it's > output ordering for a full outer join as empty, even though these join pairs > in their final coalesced form actually are ordered. > This means that code like this: > {quote}frames.reduceLeft { case (l, r) => l.join(r.hint("merge"), > usingColumns = Seq("a", "b"), joinType = "outer") } > {quote} > Given a non empty list of frames, will not 'stream' without a shuffle step, > as each join forgets its sort order. > Ideally this whole operation wouldn't require any shuffles if all the frames > are grouped and sorted by the keys. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45099) SortMergeExec with Outer using join forgets sort information
[ https://issues.apache.org/jira/browse/SPARK-45099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huw updated SPARK-45099: Priority: Minor (was: Major) > SortMergeExec with Outer using join forgets sort information > > > Key: SPARK-45099 > URL: https://issues.apache.org/jira/browse/SPARK-45099 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.1 >Reporter: Huw >Priority: Minor > > When performing a 'using' join with a sort hint in a full outer, the > ResolveNaturalAndUsingJoin will kick in and build a new join with Equality > conditions and a Projection like this: > {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, > r)), l.name)() } > {quote} > There's nothing wrong with this per se, but, SortMergeJoinExec has it's > output ordering for a full outer join as empty, even though these join pairs > in their final coalesced form actually are ordered. > This means that code like this: > {quote}frames.reduceLeft { case (l, r) => > l.join(r.hint("merge"), usingColumns = Seq("a", "b"), joinType = "outer") > } > {quote} > Given a non empty list of frames, will not 'stream' without a shuffle step, > as each join forgets its sort order. > Ideally this whole operation wouldn't require any shuffles if all the frames > are grouped and sorted by the keys. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45099) SortMergeExec with Outer using join forgets sort information
Huw created SPARK-45099: --- Summary: SortMergeExec with Outer using join forgets sort information Key: SPARK-45099 URL: https://issues.apache.org/jira/browse/SPARK-45099 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.1 Reporter: Huw When performing a 'using' join with a sort hint in a full outer, the ResolveNaturalAndUsingJoin will kick in and build a new join with Equality conditions and a Projection like this: {quote}val joinedCols = joinPairs.map \{ case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } {quote} There's nothing wrong with this per se, but, SortMergeJoinExec has it's output ordering for a full outer join as empty, even though these join pairs in their final coalesced form actually are ordered. This means that code like this: {quote}frames.reduceLeft { case (l, r) => l.join(r.hint("merge"), usingColumns = Seq("a", "b"), joinType = "outer") } {quote} Given a non empty list of frames, will not 'stream' without a shuffle step, as each join forgets its sort order. Ideally this whole operation wouldn't require any shuffles if all the frames are grouped and sorted by the keys. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-37487) CollectMetrics is executed twice if it is followed by a sort
[ https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753471#comment-17753471 ] Huw commented on SPARK-37487: - I think I've seen crashes because of this in production. I can't reproduce locally, but I believe that Imperative aggregates are having their `serialiseAggregateBufferInPlace` function called twice, the second time it's doing an unsafe coerce onto a byte buffer. {quote}Caused by: java.lang.ClassCastException: class [B cannot be cast to class org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigest ([B is in module java.base of loader 'bootstrap'; org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigest is in unnamed module of loader 'app') at org.apache.spark.sql.catalyst.expressions.aggregate.ApproxQuantiles.serialize(ApproxQuantiles.scala:19) at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624) at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:206) at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33){quote} > CollectMetrics is executed twice if it is followed by a sort > > > Key: SPARK-37487 > URL: https://issues.apache.org/jira/browse/SPARK-37487 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Tanel Kiis >Priority: Major > Labels: correctness > > It is best examplified by this new UT in DataFrameCallbackSuite: > {code} > test("SPARK-37487: get observable metrics with sort by callback") { > val df = spark.range(100) > .observe( > name = "my_event", > min($"id").as("min_val"), > max($"id").as("max_val"), > // Test unresolved alias > sum($"id"), > count(when($"id" % 2 === 0, 1)).as("num_even")) > .observe( > name = "other_event", > avg($"id").cast("int").as("avg_val")) > .sort($"id".desc) > validateObservedMetrics(df) > } > {code} > The count and sum aggregate report twice the number of rows: > {code} > [info] - SPARK-37487: get observable metrics with sort by callback *** FAILED > *** (169 milliseconds) > [info] [0,99,9900,100] did not equal [0,99,4950,50] > (DataFrameCallbackSuite.scala:342) > [info] org.scalatest.exceptions.TestFailedException: > [info] at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) > [info] at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) > [info] at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) > [info] at > org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350) > [info] at > org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324) > [info] at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) > [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > [info] at org.scalatest.Transformer.apply(Transformer.scala:20) > {code} > I could not figure out how this happes. Hopefully the UT can help with > debugging -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26655) Support multiple aggregates in Structured Streaming append mode
[ https://issues.apache.org/jira/browse/SPARK-26655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532614#comment-17532614 ] Huw commented on SPARK-26655: - I hit the guards in UnsupportedOperationChecker recently, and considered that if I was using append mode it would be sound. Glad to see it's being looked into. I think this also applies to flatMapGroupsWithState, and specifically, the error "flatMapGroupsWithState in append mode is not supported with $outputMode output mode on a streaming DataFrame/Dataset". > Support multiple aggregates in Structured Streaming append mode > --- > > Key: SPARK-26655 > URL: https://issues.apache.org/jira/browse/SPARK-26655 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Arun Mahadevan >Priority: Major > Attachments: Watermarks and multiple aggregates in Spark strucutred > streaming_v1.pdf > > > Right now multiple aggregates are not supported in structured streaming. > However, in append mode, the aggregates are emitted only after the watermark > passes the threshold (e.g. the window boundary) and the emitted value is not > affected by further late data. So it possible to chain multiple aggregates in > 'Append' output mode without worrying about retractions. > However the current event time watermarks in structured streaming are tracked > at a global level and this does not work when aggregates are chained. > We need to track the watermarks at individual operator level so that each > operator can make progress independently and not rely on global min or max > value. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39091) SQL Expression traits don't compose due to nodePatterns being final.
[ https://issues.apache.org/jira/browse/SPARK-39091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huw updated SPARK-39091: Summary: SQL Expression traits don't compose due to nodePatterns being final. (was: SQL Expression traits compose due to nodePatterns being final.) > SQL Expression traits don't compose due to nodePatterns being final. > > > Key: SPARK-39091 > URL: https://issues.apache.org/jira/browse/SPARK-39091 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0, 3.2.1 >Reporter: Huw >Priority: Major > > In Spark 3.1 I have an expression which contains these parts: > {code:scala} > case class MyExploder( > arrays: Expression,// Array[AnyDataType] > asOfDate: Expression, // LambdaFunction[AnyDataType -> TimestampType] > extractor: Expression, // TimestampType > ) extends HigherOrderFunction with Generator with TimeZoneAwareExpression { > override def arguments: Seq[Expression] = > Seq(arrays, asOfDate) > override def argumentTypes: Seq[AbstractDataType] = > Seq(ArrayType, TimestampType) > override def functions: Seq[Expression] = > Seq(extractor) > override def functionTypes = > Seq(TimestampType) > }{code} > > This is grossly simplified example. The extractor is a lambda which can > gather information from a nested array, and explodes based on some business > logic. > When upgrading to Spark 3.2 however this can't work anymore, because they > have conflicting final values for nodePatterns. > {code:java} > trait HigherOrderFunction extends Expression with ExpectsInputTypes { > final override val nodePatterns: Seq[TreePattern] = Seq(HIGH_ORDER_FUNCTION) > } {code} > > We get this errror. > {noformat} > value nodePatterns in trait TimeZoneAwareExpression of type > Seq[org.apache.spark.sql.catalyst.trees.TreePattern.TreePattern] cannot > override final member{noformat} > > This blocks us from upgrading. What's doubly annoying is that the actual > value of the member appears to be the same. > > Thank you for your time. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39091) SQL Expression traits compose due to nodePatterns being final.
[ https://issues.apache.org/jira/browse/SPARK-39091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huw updated SPARK-39091: Description: In Spark 3.1 I have an expression which contains these parts: {code:scala} case class MyExploder( arrays: Expression,// Array[AnyDataType] asOfDate: Expression, // LambdaFunction[AnyDataType -> TimestampType] extractor: Expression, // TimestampType ) extends HigherOrderFunction with Generator with TimeZoneAwareExpression { override def arguments: Seq[Expression] = Seq(arrays, asOfDate) override def argumentTypes: Seq[AbstractDataType] = Seq(ArrayType, TimestampType) override def functions: Seq[Expression] = Seq(extractor) override def functionTypes = Seq(TimestampType) }{code} This is grossly simplified example. The extractor is a lambda which can gather information from a nested array, and explodes based on some business logic. When upgrading to Spark 3.2 however this can't work anymore, because they have conflicting final values for nodePatterns. {code:java} trait HigherOrderFunction extends Expression with ExpectsInputTypes { final override val nodePatterns: Seq[TreePattern] = Seq(HIGH_ORDER_FUNCTION) } {code} We get this errror. {noformat} value nodePatterns in trait TimeZoneAwareExpression of type Seq[org.apache.spark.sql.catalyst.trees.TreePattern.TreePattern] cannot override final member{noformat} This blocks us from upgrading. What's doubly annoying is that the actual value of the member appears to be the same. Thank you for your time. was: In Spark 3.1 I have an expression which contains these parts: {code:scala} case class MyExploder( arrays: Expression,// Array[AnyDataType] asOfDate: Expression, // LambdaFunction[AnyDataType -> TimestampType] extractor: Expression, // TimestampType ) extends HigherOrderFunction with Generator with TimeZoneAwareExpression { override def arguments: Seq[Expression] = Seq(arrays, asOfDate) override def argumentTypes: Seq[AbstractDataType] = Seq(ArrayType, TimestampType) override def functions: Seq[Expression] = Seq(extractor) override def functionTypes = Seq(TimestampType) }{code} This is grossly simplified example. The extractor is a lambda which can gather information from a nested array, and explodes based on some business logic. When upgrading to Spark 3.2 however this can't work anymore, because they have conflicting final values for nodePatterns. {code:java} trait HigherOrderFunction extends Expression with ExpectsInputTypes { final override val nodePatterns: Seq[TreePattern] = Seq(HIGH_ORDER_FUNCTION) } {code} We get this errror. {noformat} value nodePatterns in trait TimeZoneAwareExpression of type Seq[org.apache.spark.sql.catalyst.trees.TreePattern.TreePattern] cannot override final member{noformat} This blocks us from upgrading. What's doubly annoying is that the actual value of the member appears to be the same. > SQL Expression traits compose due to nodePatterns being final. > -- > > Key: SPARK-39091 > URL: https://issues.apache.org/jira/browse/SPARK-39091 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0, 3.2.1 >Reporter: Huw >Priority: Major > > In Spark 3.1 I have an expression which contains these parts: > {code:scala} > case class MyExploder( > arrays: Expression,// Array[AnyDataType] > asOfDate: Expression, // LambdaFunction[AnyDataType -> TimestampType] > extractor: Expression, // TimestampType > ) extends HigherOrderFunction with Generator with TimeZoneAwareExpression { > override def arguments: Seq[Expression] = > Seq(arrays, asOfDate) > override def argumentTypes: Seq[AbstractDataType] = > Seq(ArrayType, TimestampType) > override def functions: Seq[Expression] = > Seq(extractor) > override def functionTypes = > Seq(TimestampType) > }{code} > > This is grossly simplified example. The extractor is a lambda which can > gather information from a nested array, and explodes based on some business > logic. > When upgrading to Spark 3.2 however this can't work anymore, because they > have conflicting final values for nodePatterns. > {code:java} > trait HigherOrderFunction extends Expression with ExpectsInputTypes { > final override val nodePatterns: Seq[TreePattern] = Seq(HIGH_ORDER_FUNCTION) > } {code} > > We get this errror. > {noformat} > value nodePatterns in trait TimeZoneAwareExpression of type > Seq[org.apache.spark.sql.catalyst.trees.TreePattern.TreePattern] cannot > override final member{noformat} > > This blocks us from upgrading. What's doubly annoying is that the actual > value of the member appears to be the same. > > Thank you for your time. -- This message was sent by Atlassian Jira
[jira] [Created] (SPARK-39091) SQL Expression traits compose due to nodePatterns being final.
Huw created SPARK-39091: --- Summary: SQL Expression traits compose due to nodePatterns being final. Key: SPARK-39091 URL: https://issues.apache.org/jira/browse/SPARK-39091 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.1, 3.2.0 Reporter: Huw In Spark 3.1 I have an expression which contains these parts: {code:scala} case class MyExploder( arrays: Expression,// Array[AnyDataType] asOfDate: Expression, // LambdaFunction[AnyDataType -> TimestampType] extractor: Expression, // TimestampType ) extends HigherOrderFunction with Generator with TimeZoneAwareExpression { override def arguments: Seq[Expression] = Seq(arrays, asOfDate) override def argumentTypes: Seq[AbstractDataType] = Seq(ArrayType, TimestampType) override def functions: Seq[Expression] = Seq(extractor) override def functionTypes = Seq(TimestampType) }{code} This is grossly simplified example. The extractor is a lambda which can gather information from a nested array, and explodes based on some business logic. When upgrading to Spark 3.2 however this can't work anymore, because they have conflicting final values for nodePatterns. {code:java} trait HigherOrderFunction extends Expression with ExpectsInputTypes { final override val nodePatterns: Seq[TreePattern] = Seq(HIGH_ORDER_FUNCTION) } {code} We get this errror. {noformat} value nodePatterns in trait TimeZoneAwareExpression of type Seq[org.apache.spark.sql.catalyst.trees.TreePattern.TreePattern] cannot override final member{noformat} This blocks us from upgrading. What's doubly annoying is that the actual value of the member appears to be the same. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org