[jira] [Assigned] (SPARK-47927) Nullability after join not respected in UDF

2024-04-27 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-47927:
---

Assignee: Emil Ejbyfeldt

> Nullability after join not respected in UDF
> ---
>
> Key: SPARK-47927
> URL: https://issues.apache.org/jira/browse/SPARK-47927
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 4.0.0, 3.5.1, 3.4.3
>Reporter: Emil Ejbyfeldt
>Assignee: Emil Ejbyfeldt
>Priority: Major
>  Labels: correctness, pull-request-available
>
> {code:java}
> val ds1 = Seq(1).toDS()
> val ds2 = Seq[Int]().toDS()
> val f = udf[(Int, Option[Int]), (Int, Option[Int])](identity)
> ds1.join(ds2, ds1("value") === ds2("value"), 
> "outer").select(f(struct(ds1("value"), ds2("value".show()
> ds1.join(ds2, ds1("value") === ds2("value"), 
> "outer").select(struct(ds1("value"), ds2("value"))).show() {code}
> outputs
> {code:java}
> +---+
> |UDF(struct(value, value, value, value))|
> +---+
> |                                 {1, 0}|
> +---+
> ++
> |struct(value, value)|
> ++
> |           {1, NULL}|
> ++ {code}
> So when the result is passed to UDF the null-ability after the the join is 
> not respected and we incorrectly end up with a 0 value instead of a null/None 
> value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47927) Nullability after join not respected in UDF

2024-04-27 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-47927.
-
Fix Version/s: 3.4.4
   3.5.2
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 46156
[https://github.com/apache/spark/pull/46156]

> Nullability after join not respected in UDF
> ---
>
> Key: SPARK-47927
> URL: https://issues.apache.org/jira/browse/SPARK-47927
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 4.0.0, 3.5.1, 3.4.3
>Reporter: Emil Ejbyfeldt
>Assignee: Emil Ejbyfeldt
>Priority: Major
>  Labels: correctness, pull-request-available
> Fix For: 3.4.4, 3.5.2, 4.0.0
>
>
> {code:java}
> val ds1 = Seq(1).toDS()
> val ds2 = Seq[Int]().toDS()
> val f = udf[(Int, Option[Int]), (Int, Option[Int])](identity)
> ds1.join(ds2, ds1("value") === ds2("value"), 
> "outer").select(f(struct(ds1("value"), ds2("value".show()
> ds1.join(ds2, ds1("value") === ds2("value"), 
> "outer").select(struct(ds1("value"), ds2("value"))).show() {code}
> outputs
> {code:java}
> +---+
> |UDF(struct(value, value, value, value))|
> +---+
> |                                 {1, 0}|
> +---+
> ++
> |struct(value, value)|
> ++
> |           {1, NULL}|
> ++ {code}
> So when the result is passed to UDF the null-ability after the the join is 
> not respected and we incorrectly end up with a 0 value instead of a null/None 
> value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-48019) ColumnVectors with dictionaries and nulls are not read/copied correctly

2024-04-27 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-48019:
---

Assignee: Gene Pang

> ColumnVectors with dictionaries and nulls are not read/copied correctly
> ---
>
> Key: SPARK-48019
> URL: https://issues.apache.org/jira/browse/SPARK-48019
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.3
>Reporter: Gene Pang
>Assignee: Gene Pang
>Priority: Major
>  Labels: pull-request-available
>
> {{ColumnVectors}} have APIs like {{getInts}}, {{getFloats}} and so on. Those 
> return a primitive array with the contents of the vector. When the 
> ColumnVector has a dictionary, the values are decoded with the dictionary 
> before filling in the primitive array.
> However, {{ColumnVectors}} can have nulls, and for those {{null}} entries, 
> the dictionary id is irrelevant, and can also be invalid. The dictionary 
> should not be used for the {{null}} entries of the vector. Sometimes, this 
> can cause an {{ArrayIndexOutOfBoundsException}} .
> In addition to the possible Exception, copying a {{ColumnarArray}} is not 
> correct. A {{ColumnarArray}} contains a {{ColumnVector}} so it can contain 
> {{null}} values. However, the {{copy()}} for primitive types does not take 
> into account the null-ness of the entries, and blindly copies all the 
> primitive values. That means the null entries get lost.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-48019) ColumnVectors with dictionaries and nulls are not read/copied correctly

2024-04-27 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-48019.
-
Fix Version/s: 3.5.2
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 46254
[https://github.com/apache/spark/pull/46254]

> ColumnVectors with dictionaries and nulls are not read/copied correctly
> ---
>
> Key: SPARK-48019
> URL: https://issues.apache.org/jira/browse/SPARK-48019
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.3
>Reporter: Gene Pang
>Assignee: Gene Pang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 3.5.2, 4.0.0
>
>
> {{ColumnVectors}} have APIs like {{getInts}}, {{getFloats}} and so on. Those 
> return a primitive array with the contents of the vector. When the 
> ColumnVector has a dictionary, the values are decoded with the dictionary 
> before filling in the primitive array.
> However, {{ColumnVectors}} can have nulls, and for those {{null}} entries, 
> the dictionary id is irrelevant, and can also be invalid. The dictionary 
> should not be used for the {{null}} entries of the vector. Sometimes, this 
> can cause an {{ArrayIndexOutOfBoundsException}} .
> In addition to the possible Exception, copying a {{ColumnarArray}} is not 
> correct. A {{ColumnarArray}} contains a {{ColumnVector}} so it can contain 
> {{null}} values. However, the {{copy()}} for primitive types does not take 
> into account the null-ness of the entries, and blindly copies all the 
> primitive values. That means the null entries get lost.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48019) ColumnVectors with dictionaries and nulls are not read/copied correctly

2024-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-48019:
---
Labels: pull-request-available  (was: )

> ColumnVectors with dictionaries and nulls are not read/copied correctly
> ---
>
> Key: SPARK-48019
> URL: https://issues.apache.org/jira/browse/SPARK-48019
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.3
>Reporter: Gene Pang
>Priority: Major
>  Labels: pull-request-available
>
> {{ColumnVectors}} have APIs like {{getInts}}, {{getFloats}} and so on. Those 
> return a primitive array with the contents of the vector. When the 
> ColumnVector has a dictionary, the values are decoded with the dictionary 
> before filling in the primitive array.
> However, {{ColumnVectors}} can have nulls, and for those {{null}} entries, 
> the dictionary id is irrelevant, and can also be invalid. The dictionary 
> should not be used for the {{null}} entries of the vector. Sometimes, this 
> can cause an {{ArrayIndexOutOfBoundsException}} .
> In addition to the possible Exception, copying a {{ColumnarArray}} is not 
> correct. A {{ColumnarArray}} contains a {{ColumnVector}} so it can contain 
> {{null}} values. However, the {{copy()}} for primitive types does not take 
> into account the null-ness of the entries, and blindly copies all the 
> primitive values. That means the null entries get lost.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-48021) Add `--add-modules=jdk.incubator.vector` to `JavaModuleOptions`

2024-04-27 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun reassigned SPARK-48021:
-

Assignee: BingKun Pan

> Add `--add-modules=jdk.incubator.vector` to `JavaModuleOptions`
> ---
>
> Key: SPARK-48021
> URL: https://issues.apache.org/jira/browse/SPARK-48021
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 4.0.0
>Reporter: BingKun Pan
>Assignee: BingKun Pan
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-48021) Add `--add-modules=jdk.incubator.vector` to `JavaModuleOptions`

2024-04-27 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-48021.
---
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 46246
[https://github.com/apache/spark/pull/46246]

> Add `--add-modules=jdk.incubator.vector` to `JavaModuleOptions`
> ---
>
> Key: SPARK-48021
> URL: https://issues.apache.org/jira/browse/SPARK-48021
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 4.0.0
>Reporter: BingKun Pan
>Assignee: BingKun Pan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48023) Stream-stream join with windowed+watermarked dropDuplicates suppresses all rows

2024-04-27 Thread Scott Schenkein (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Schenkein updated SPARK-48023:

Description: 
When applying a streaming dropDuplicates-with-watermark to  a self-referential 
stream-stream left-join with watermark, the dropDuplicates drops all rows.  If 
the watermark is eliminated from the dropDuplicates, the query behaves as 
expected.

 

The code below demonstrates the error:

 
{code:java}
# 
# 1. Generate the test data
#
size = 1000
step_secs = 300
event_offset_secs = 240
# Add some lines to not left join
skips = set()
skips.add(3)
skips.add(18)
skips.add(800)

def lit_ts(secs):
    return datetime.datetime.fromtimestamp(secs)

data = []
base_time = time.time()
for x in range(size):
    ts = base_time + (step_secs * x)
    data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)})
    if x not in skips:
        data.append(
            {
                "event_id": f"two_{x}",
                "join_id": x,
                "ts": lit_ts(ts - event_offset_secs),
            }
        )

# Add duplicates to validate the dropDuplicates
for i in range(len(data)):
   data.append(data[i].copy()) 

#
# 2. Write the results so we can stream
#
path = "/tmp/bugtest"
df = spark.createDataFrame(data, schema="event_id string, join_id string, ts 
TIMESTAMP")
df.repartition(1).write.format("delta").mode("overwrite").save(path)


df = spark.read.format("delta").load(path)
df.createOrReplaceTempView("test_data")
#
# 3. Define the test query
#
sql = """
with also_test_data as (
   select * from test_data
   where event_id like 'two%'
)
select td.* from test_data
td left join also_test_data on (
   td.join_id = also_test_data.join_id
   and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES
   and also_test_data.ts <= td.ts
)
where td.event_id like 'one%'
-- rows where left-join does not match
and also_test_data.event_id is NULL 
"""
#
# 4. Run it non-streaming w/non-deDuplicated to validate results
#
res = spark.sql(sql)
print("Static query")
res.show(truncate=False)
# Static query
# ++---+-+
# |event_id|join_id|ts                       |
# ++---+-+
# |one_3   |3      |2024-04-27 17:36:54.97927|
# |one_18  |18     |2024-04-27 18:51:54.97927|
# |one_800 |800    |2024-04-30 12:01:54.97927|
# |one_3   |3      |2024-04-27 17:36:54.97927|
# |one_18  |18     |2024-04-27 18:51:54.97927|
# |one_800 |800    |2024-04-30 12:01:54.97927|
# ++---+-+


#
# 5. Run it as a stream with no-dropDuplicates
#
def write_stream(res):
    (   
        res.writeStream.outputMode("append")
        .trigger(availableNow=True)
        .format("console")
        .start()
        .awaitTermination()
    )
sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15 
minutes')
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
write_stream(res)
# Batch: 1
# ---
# ++---++
# |event_id|join_id|                  ts|
# ++---++
# | one_800|    800|2024-04-30 12:01:...|
# | one_800|    800|2024-04-30 12:01:...|
# |   one_3|      3|2024-04-27 17:36:...|
# |   one_3|      3|2024-04-27 17:36:...|
# |  one_18|     18|2024-04-27 18:51:...|
# |  one_18|     18|2024-04-27 18:51:...|
# ++---++

#
# 6. Run it as a stream with dropDuplicates, but no extra watermark
#
sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 
minutes")
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
res = (
    (res.select(window("ts", "30 minutes"), "*"))
    .dropDuplicates(["event_id"]).drop("window")
)
write_stream(res)
# ---                                 
# Batch: 1
# ---
# ++---++
# |event_id|join_id|                  ts|
# ++---++
# | one_800|    800|2024-04-30 12:01:...|
# |  one_18|     18|2024-04-27 18:51:...|
# |   one_3|      3|2024-04-27 17:36:...|
# ++---++

#
# 7. Run it as a stream with dropDuplicates, using a watermark
# THIS is where we see the error
#
sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 
minutes")
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
res = (
    (res.select(window("ts", "30 minutes"), "*"))
    .withWatermark("window", '30 minutes')
    .dropDuplicates(["event_id"]).drop("window")
)
write_stream(res) 
print("END")
# ---                                  
# Batch: 0
# ---
# ++---+---+
# |event_id|join_id| ts|
# ++---+---+
# ++---+---+
# END{code}
 

 

  was:
When applying a streaming 

[jira] [Updated] (SPARK-48023) Stream-stream join with windowed+watermarked drop-duplicates suppresses all rows

2024-04-27 Thread Scott Schenkein (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Schenkein updated SPARK-48023:

Description: 
When applying a streaming dropDuplicates-with-watermark to  a self-referential 
stream-stream left-join with watermark, the dropDuplicates drops all rows.  If 
the watermark is eliminated from the dropDuplicates, the query behaves as 
expected.

 

The code below demonstrates the error:

 
{code:java}
# 
# 1. Generate the test data
#
size = 1000
step_secs = 300
event_offset_secs = 240
# Add some lines to not left join
skips = set()
skips.add(3)
skips.add(18)
skips.add(800)

def lit_ts(secs):
    return datetime.datetime.fromtimestamp(secs)

data = []
base_time = time.time()
for x in range(size):
    ts = base_time + (step_secs * x)
    data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)})
    if x not in skips:
        data.append(
            {
                "event_id": f"two_{x}",
                "join_id": x,
                "ts": lit_ts(ts - event_offset_secs),
            }
        )

# Add duplicates to validate the dropDuplicates
for i in range(len(data)):
   data.append(data[i].copy()) 

#
# 2. Write the results so we can stream
#
path = "/tmp/bugtest"
df = spark.createDataFrame(data, schema="event_id string, join_id string, ts 
TIMESTAMP")
df.repartition(1).write.format("delta").mode("overwrite").save(path)


df = spark.read.format("delta").load(path)
df.createOrReplaceTempView("test_data")
#
# 3. Define the test query
#
sql = """
with also_test_data as (
   select * from test_data
   where event_id like 'two%'
)
select td.* from test_data
td left join also_test_data on (
   td.join_id = also_test_data.join_id
   and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES
   and also_test_data.ts <= td.ts
)
where td.event_id like 'one%'
-- rows where left-join does not match
and also_test_data.event_id is NULL 
"""
#
# 4. Run it non-streaming w/non-deDuplicated to validate results
#
res = spark.sql(sql)
print("Static query")
res.show(truncate=False)
# Static query
# ++---+-+
# |event_id|join_id|ts                       |
# ++---+-+
# |one_3   |3      |2024-04-27 17:36:54.97927|
# |one_18  |18     |2024-04-27 18:51:54.97927|
# |one_800 |800    |2024-04-30 12:01:54.97927|
# |one_3   |3      |2024-04-27 17:36:54.97927|
# |one_18  |18     |2024-04-27 18:51:54.97927|
# |one_800 |800    |2024-04-30 12:01:54.97927|
# ++---+-+


#
# 5. Run it as a stream with no-dropDuplicates
#
def write_stream(res):
    (   
        res.writeStream.outputMode("append")
        .trigger(availableNow=True)
        .format("console")
        .start()
        .awaitTermination()
    )
sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15 
minutes')
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
write_stream(res)
# Batch: 1
# ---
# ++---++
# |event_id|join_id|                  ts|
# ++---++
# | one_800|    800|2024-04-30 12:01:...|
# | one_800|    800|2024-04-30 12:01:...|
# |   one_3|      3|2024-04-27 17:36:...|
# |   one_3|      3|2024-04-27 17:36:...|
# |  one_18|     18|2024-04-27 18:51:...|
# |  one_18|     18|2024-04-27 18:51:...|
# ++---++

#
# 6. Run it as a stream with dropDuplicates, but no extra watermark
#
sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 
minutes")
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
res = (
    (res.select(window("ts", "30 minutes"), "*"))
    .dropDuplicates(["event_id"]).drop("window")
)
write_stream(res)
# ---                                 
# Batch: 1
# ---
# ++---++
# |event_id|join_id|                  ts|
# ++---++
# | one_800|    800|2024-04-30 12:01:...|
# |  one_18|     18|2024-04-27 18:51:...|
# |   one_3|      3|2024-04-27 17:36:...|
# ++---++

#
# 7. Run it as a stream with dropDuplicates, using a watermark
# THIS is where we see the error
#
sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 
minutes")
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
res = (
    (res.select(window("ts", "30 minutes"), "*"))
    .withWatermark("window", '30 minutes')
    .dropDuplicates(["event_id"]).drop("window")
)
write_stream(res) 
print("END")
# ---                                  
# Batch: 0
# ---
# ++---+---++
# |event_id|join_id| ts|detection_ts|
# ++---+---++
# ++---+---++
# 

[jira] [Updated] (SPARK-48023) Stream-stream join with windowed+watermarked dropDuplicates suppresses all rows

2024-04-27 Thread Scott Schenkein (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Schenkein updated SPARK-48023:

Summary: Stream-stream join with windowed+watermarked dropDuplicates 
suppresses all rows  (was: Stream-stream join with windowed+watermarked 
drop-duplicates suppresses all rows)

> Stream-stream join with windowed+watermarked dropDuplicates suppresses all 
> rows
> ---
>
> Key: SPARK-48023
> URL: https://issues.apache.org/jira/browse/SPARK-48023
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.5.0
>Reporter: Scott Schenkein
>Priority: Major
>
> When applying a streaming dropDuplicates-with-watermark to  a 
> self-referential stream-stream left-join with watermark, the dropDuplicates 
> drops all rows.  If the watermark is eliminated from the dropDuplicates, the 
> query behaves as expected.
>  
> The code below demonstrates the error:
>  
> {code:java}
> # 
> # 1. Generate the test data
> #
> size = 1000
> step_secs = 300
> event_offset_secs = 240
> # Add some lines to not left join
> skips = set()
> skips.add(3)
> skips.add(18)
> skips.add(800)
> def lit_ts(secs):
>     return datetime.datetime.fromtimestamp(secs)
> data = []
> base_time = time.time()
> for x in range(size):
>     ts = base_time + (step_secs * x)
>     data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)})
>     if x not in skips:
>         data.append(
>             {
>                 "event_id": f"two_{x}",
>                 "join_id": x,
>                 "ts": lit_ts(ts - event_offset_secs),
>             }
>         )
> # Add duplicates to validate the dropDuplicates
> for i in range(len(data)):
>    data.append(data[i].copy()) 
> #
> # 2. Write the results so we can stream
> #
> path = "/tmp/bugtest"
> df = spark.createDataFrame(data, schema="event_id string, join_id string, ts 
> TIMESTAMP")
> df.repartition(1).write.format("delta").mode("overwrite").save(path)
> df = spark.read.format("delta").load(path)
> df.createOrReplaceTempView("test_data")
> #
> # 3. Define the test query
> #
> sql = """
> with also_test_data as (
>    select * from test_data
>    where event_id like 'two%'
> )
> select td.* from test_data
> td left join also_test_data on (
>    td.join_id = also_test_data.join_id
>    and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES
>    and also_test_data.ts <= td.ts
> )
> where td.event_id like 'one%'
> -- rows where left-join does not match
> and also_test_data.event_id is NULL 
> """
> #
> # 4. Run it non-streaming w/non-deDuplicated to validate results
> #
> res = spark.sql(sql)
> print("Static query")
> res.show(truncate=False)
> # Static query
> # ++---+-+
> # |event_id|join_id|ts                       |
> # ++---+-+
> # |one_3   |3      |2024-04-27 17:36:54.97927|
> # |one_18  |18     |2024-04-27 18:51:54.97927|
> # |one_800 |800    |2024-04-30 12:01:54.97927|
> # |one_3   |3      |2024-04-27 17:36:54.97927|
> # |one_18  |18     |2024-04-27 18:51:54.97927|
> # |one_800 |800    |2024-04-30 12:01:54.97927|
> # ++---+-+
> #
> # 5. Run it as a stream with no-dropDuplicates
> #
> def write_stream(res):
>     (   
>         res.writeStream.outputMode("append")
>         .trigger(availableNow=True)
>         .format("console")
>         .start()
>         .awaitTermination()
>     )
> sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15 
> minutes')
> sdf.createOrReplaceTempView("test_data")
> res = spark.sql(sql)
> write_stream(res)
> # Batch: 1
> # ---
> # ++---++
> # |event_id|join_id|                  ts|
> # ++---++
> # | one_800|    800|2024-04-30 12:01:...|
> # | one_800|    800|2024-04-30 12:01:...|
> # |   one_3|      3|2024-04-27 17:36:...|
> # |   one_3|      3|2024-04-27 17:36:...|
> # |  one_18|     18|2024-04-27 18:51:...|
> # |  one_18|     18|2024-04-27 18:51:...|
> # ++---++
> #
> # 6. Run it as a stream with dropDuplicates, but no extra watermark
> #
> sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 
> minutes")
> sdf.createOrReplaceTempView("test_data")
> res = spark.sql(sql)
> res = (
>     (res.select(window("ts", "30 minutes"), "*"))
>     .dropDuplicates(["event_id"]).drop("window")
> )
> write_stream(res)
> # ---                                 
> # Batch: 1
> # ---
> # ++---++
> # |event_id|join_id|                  ts|
> # ++---++
> # | one_800|    

[jira] [Updated] (SPARK-48023) Stream-stream join with windowed+watermarked drop-duplicates suppresses all rows

2024-04-27 Thread Scott Schenkein (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Schenkein updated SPARK-48023:

Summary: Stream-stream join with windowed+watermarked drop-duplicates 
suppresses all rows  (was: Stream-stream join with windowed drop-duplicates 
suppresses all rows)

> Stream-stream join with windowed+watermarked drop-duplicates suppresses all 
> rows
> 
>
> Key: SPARK-48023
> URL: https://issues.apache.org/jira/browse/SPARK-48023
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.5.0
>Reporter: Scott Schenkein
>Priority: Major
>
> When applying a streaming dropDuplicates-with-watermark to  a 
> self-referential stream-stream left-join with watermark, the dropDuplicates 
> drops all rows.  If the watermark is eliminated from the dropDuplicates, the 
> query behaves as expected.
>  
> The code below demonstrates the error:
>  
> {code:java}
> # 
> # 1. Generate the test data
> #
> size = 1000
> step_secs = 300
> event_offset_secs = 240
> # Add some lines to not left join
> skips = set()
> skips.add(3)
> skips.add(18)
> skips.add(800)
> def lit_ts(secs):
>     return datetime.datetime.fromtimestamp(secs)
> data = []
> base_time = time.time()
> for x in range(size):
>     ts = base_time + (step_secs * x)
>     data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)})
>     if x not in skips:
>         data.append(
>             {
>                 "event_id": f"two_{x}",
>                 "join_id": x,
>                 "ts": lit_ts(ts - event_offset_secs),
>             }
>         )
> # Add duplicates to validate the dropDuplicates
> for i in range(len(data)):
>    data.append(data[i].copy()) 
> #
> # 2. Write the results so we can stream
> #
> path = "/tmp/bugtest"
> df = spark.createDataFrame(data, schema="event_id string, join_id string, ts 
> TIMESTAMP")
> df.repartition(1).write.format("delta").mode("overwrite").save(path)
> df = spark.read.format("delta").load(path)
> df.createOrReplaceTempView("test_data")
> #
> # 3. Define the test query
> #
> sql = """
> with also_test_data as (
>    select * from test_data
>    where event_id like 'two%'
> )
> select td.* from test_data
> td left join also_test_data on (
>    td.join_id = also_test_data.join_id
>    and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES
>    and also_test_data.ts <= td.ts
> )
> where td.event_id like 'one%'
> and also_test_data.event_id is NULL
> """
> #
> # 4. Run it non-streaming w/non-deDuplicated to validate results
> #
> res = spark.sql(sql)
> print("Static query")
> res.show(truncate=False)
> # Static query
> # ++---+-+
> # |event_id|join_id|ts                       |
> # ++---+-+
> # |one_3   |3      |2024-04-27 17:36:54.97927|
> # |one_18  |18     |2024-04-27 18:51:54.97927|
> # |one_800 |800    |2024-04-30 12:01:54.97927|
> # |one_3   |3      |2024-04-27 17:36:54.97927|
> # |one_18  |18     |2024-04-27 18:51:54.97927|
> # |one_800 |800    |2024-04-30 12:01:54.97927|
> # ++---+-+
> #
> # 5. Run it as a stream with no-dropDuplicates
> #
> def write_stream(res):
>     (   
>         res.writeStream.outputMode("append")
>         .trigger(availableNow=True)
>         .format("console")
>         .start()
>         .awaitTermination()
>     )
> sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15 
> minutes')
> sdf.createOrReplaceTempView("test_data")
> res = spark.sql(sql)
> write_stream(res)
> # Batch: 1
> # ---
> # ++---++
> # |event_id|join_id|                  ts|
> # ++---++
> # | one_800|    800|2024-04-30 12:01:...|
> # | one_800|    800|2024-04-30 12:01:...|
> # |   one_3|      3|2024-04-27 17:36:...|
> # |   one_3|      3|2024-04-27 17:36:...|
> # |  one_18|     18|2024-04-27 18:51:...|
> # |  one_18|     18|2024-04-27 18:51:...|
> # ++---++
> #
> # 6. Run it as a stream with dropDuplicates, but no extra watermark
> #
> sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 
> minutes")
> sdf.createOrReplaceTempView("test_data")
> res = spark.sql(sql)
> res = (
>     (res.select(window("ts", "30 minutes"), "*"))
>     .dropDuplicates(["event_id"]).drop("window")
> )
> write_stream(res)
> # ---                                 
> # Batch: 1
> # ---
> # ++---++
> # |event_id|join_id|                  ts|
> # ++---++
> # | one_800|    800|2024-04-30 12:01:...|
> # |  one_18|     

[jira] [Updated] (SPARK-48023) Stream-stream join with windowed drop-duplicates suppresses all rows

2024-04-27 Thread Scott Schenkein (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Schenkein updated SPARK-48023:

Description: 
When applying a streaming dropDuplicates-with-watermark to  a self-referential 
stream-stream left-join with watermark, the dropDuplicates drops all rows.  If 
the watermark is eliminated from the dropDuplicates, the query behaves as 
expected.

 

The code below demonstrates the error:

 
{code:java}
# 
# 1. Generate the test data
#
size = 1000
step_secs = 300
event_offset_secs = 240
# Add some lines to not left join
skips = set()
skips.add(3)
skips.add(18)
skips.add(800)

def lit_ts(secs):
    return datetime.datetime.fromtimestamp(secs)

data = []
base_time = time.time()
for x in range(size):
    ts = base_time + (step_secs * x)
    data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)})
    if x not in skips:
        data.append(
            {
                "event_id": f"two_{x}",
                "join_id": x,
                "ts": lit_ts(ts - event_offset_secs),
            }
        )

# Add duplicates to validate the dropDuplicates
for i in range(len(data)):
   data.append(data[i].copy()) 

#
# 2. Write the results so we can stream
#
path = "/tmp/bugtest"
df = spark.createDataFrame(data, schema="event_id string, join_id string, ts 
TIMESTAMP")
df.repartition(1).write.format("delta").mode("overwrite").save(path)


df = spark.read.format("delta").load(path)
df.createOrReplaceTempView("test_data")
#
# 3. Define the test query
#
sql = """
with also_test_data as (
   select * from test_data
   where event_id like 'two%'
)
select td.* from test_data
td left join also_test_data on (
   td.join_id = also_test_data.join_id
   and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES
   and also_test_data.ts <= td.ts
)
where td.event_id like 'one%'
and also_test_data.event_id is NULL
"""
#
# 4. Run it non-streaming w/non-deDuplicated to validate results
#
res = spark.sql(sql)
print("Static query")
res.show(truncate=False)
# Static query
# ++---+-+
# |event_id|join_id|ts                       |
# ++---+-+
# |one_3   |3      |2024-04-27 17:36:54.97927|
# |one_18  |18     |2024-04-27 18:51:54.97927|
# |one_800 |800    |2024-04-30 12:01:54.97927|
# |one_3   |3      |2024-04-27 17:36:54.97927|
# |one_18  |18     |2024-04-27 18:51:54.97927|
# |one_800 |800    |2024-04-30 12:01:54.97927|
# ++---+-+


#
# 5. Run it as a stream with no-dropDuplicates
#
def write_stream(res):
    (   
        res.writeStream.outputMode("append")
        .trigger(availableNow=True)
        .format("console")
        .start()
        .awaitTermination()
    )
sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15 
minutes')
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
write_stream(res)
# Batch: 1
# ---
# ++---++
# |event_id|join_id|                  ts|
# ++---++
# | one_800|    800|2024-04-30 12:01:...|
# | one_800|    800|2024-04-30 12:01:...|
# |   one_3|      3|2024-04-27 17:36:...|
# |   one_3|      3|2024-04-27 17:36:...|
# |  one_18|     18|2024-04-27 18:51:...|
# |  one_18|     18|2024-04-27 18:51:...|
# ++---++

#
# 6. Run it as a stream with dropDuplicates, but no extra watermark
#
sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 
minutes")
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
res = (
    (res.select(window("ts", "30 minutes"), "*"))
    .dropDuplicates(["event_id"]).drop("window")
)
write_stream(res)
# ---                                 
# Batch: 1
# ---
# ++---++
# |event_id|join_id|                  ts|
# ++---++
# | one_800|    800|2024-04-30 12:01:...|
# |  one_18|     18|2024-04-27 18:51:...|
# |   one_3|      3|2024-04-27 17:36:...|
# ++---++

#
# 7. Run it as a stream with dropDuplicates, using a watermark
# THIS is where we see the error
#
sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 
minutes")
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
res = (
    (res.select(window("ts", "30 minutes"), "*"))
    .withWatermark("window", '30 minutes')
    .dropDuplicates(["event_id"]).drop("window")
)
write_stream(res) 
print("END")
# ---                                  
# Batch: 0
# ---
# ++---+---++
# |event_id|join_id| ts|detection_ts|
# ++---+---++
# ++---+---++
# END{code}
 

 

  was:
When applying a 

[jira] [Created] (SPARK-48023) Stream-stream join with windowed drop-duplicates suppresses all rows

2024-04-27 Thread Scott Schenkein (Jira)
Scott Schenkein created SPARK-48023:
---

 Summary: Stream-stream join with windowed drop-duplicates 
suppresses all rows
 Key: SPARK-48023
 URL: https://issues.apache.org/jira/browse/SPARK-48023
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.5.0
Reporter: Scott Schenkein


When applying a streaming dropDuplicates-with-watermark to  a self-referential 
stream-stream left-join with watermark, the dropDuplicates drops all rows.  If 
the watermark is eliminated from the dropDuplicates, the query behaves as 
expected.

 

The code below demonstrates the error:

 
{code:java}
# 
# 1. Generate the test data
#
size = 1000
step_secs = 300
event_offset_secs = 240
# Add some lines to not left join
skips = set()
skips.add(3)
skips.add(18)
skips.add(800)

def lit_ts(secs):
    return datetime.datetime.fromtimestamp(secs)

data = []
base_time = time.time()
for x in range(size):
    ts = base_time + (step_secs * x)
    data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)})
    if x not in skips:
        data.append(
            {
                "event_id": f"two_{x}",
                "join_id": x,
                "ts": lit_ts(ts - event_offset_secs),
            }
        )

# Add duplicates to validate the dropDuplicates
for i in range(len(data)):
   data.append(data[i].copy()) 

#
# 2. Write the results so we can stream
#
path = "/tmp/bugtest"
df = spark.createDataFrame(data, schema="event_id string, join_id string, ts 
TIMESTAMP")
df.repartition(1).write.format("delta").mode("overwrite").save(path)


df = spark.read.format("delta").load(path)
df.createOrReplaceTempView("test_data")
#
# 3. Define the test query
#
sql = """
with also_test_data as (
   select * from test_data
   where event_id like 'two%'
)
select td.* from test_data
td left join also_test_data on (
   td.join_id = also_test_data.join_id
   and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES
   and also_test_data.ts <= td.ts
)
where td.event_id like 'one%'
and also_test_data.event_id is NULL
"""
#
# 4. Run it non-streaming w/non-deDuplicated to validate results
#
res = spark.sql(sql)
print("Static query")
res.show(truncate=False)
# ++---+-+
# |event_id|join_id|ts                       |
# ++---+-+
# |one_3   |3      |2024-04-27 17:36:54.97927|
# |one_18  |18     |2024-04-27 18:51:54.97927|
# |one_800 |800    |2024-04-30 12:01:54.97927|
# |one_3   |3      |2024-04-27 17:36:54.97927|
# |one_18  |18     |2024-04-27 18:51:54.97927|
# |one_800 |800    |2024-04-30 12:01:54.97927|
# ++---+-+


#
# 5. Run it as a stream with no-dropDuplicates
#
def write_stream(res):
    (   
        res.writeStream.outputMode("append")
        .trigger(availableNow=True)
        .format("console")
        .start()
        .awaitTermination()
    )
sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15 
minutes')
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
write_stream(res)
# Batch: 1
# ---
# ++---++
# |event_id|join_id|                  ts|
# ++---++
# | one_800|    800|2024-04-30 12:01:...|
# | one_800|    800|2024-04-30 12:01:...|
# |   one_3|      3|2024-04-27 17:36:...|
# |   one_3|      3|2024-04-27 17:36:...|
# |  one_18|     18|2024-04-27 18:51:...|
# |  one_18|     18|2024-04-27 18:51:...|
# ++---++

#
# 6. Run it as a stream with dropDuplicates, but no extra watermark
#
sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 
minutes")
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
res = (
    (res.select(window("ts", "30 minutes"), "*"))
    .dropDuplicates(["event_id"]).drop("window")
)
write_stream(res)
# ---                                 
# Batch: 1
# ---
# ++---++
# |event_id|join_id|                  ts|
# ++---++
# | one_800|    800|2024-04-30 12:01:...|
# |  one_18|     18|2024-04-27 18:51:...|
# |   one_3|      3|2024-04-27 17:36:...|
# ++---++

#
# 7. Run it as a stream with dropDuplicates, using a watermark
# THIS is where we see the error
#
sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 
minutes")
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
res = (
    (res.select(window("ts", "30 minutes"), "*"))
    .withWatermark("window", '30 minutes')
    .dropDuplicates(["event_id"]).drop("window")
)
write_stream(res) 
print("END")
# ---                                  
# Batch: 0
# ---

[jira] [Assigned] (SPARK-48020) Pin 'pandas==2.2.2'

2024-04-27 Thread Kent Yao (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kent Yao reassigned SPARK-48020:


Assignee: Ruifeng Zheng

> Pin 'pandas==2.2.2'
> ---
>
> Key: SPARK-48020
> URL: https://issues.apache.org/jira/browse/SPARK-48020
> Project: Spark
>  Issue Type: Test
>  Components: Project Infra, PySpark
>Affects Versions: 4.0.0
>Reporter: Ruifeng Zheng
>Assignee: Ruifeng Zheng
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-48020) Pin 'pandas==2.2.2'

2024-04-27 Thread Kent Yao (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kent Yao resolved SPARK-48020.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 46256
[https://github.com/apache/spark/pull/46256]

> Pin 'pandas==2.2.2'
> ---
>
> Key: SPARK-48020
> URL: https://issues.apache.org/jira/browse/SPARK-48020
> Project: Spark
>  Issue Type: Test
>  Components: Project Infra, PySpark
>Affects Versions: 4.0.0
>Reporter: Ruifeng Zheng
>Assignee: Ruifeng Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48021) Add `--add-modules=jdk.incubator.vector` to `JavaModuleOptions`

2024-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-48021:
---
Labels: pull-request-available  (was: )

> Add `--add-modules=jdk.incubator.vector` to `JavaModuleOptions`
> ---
>
> Key: SPARK-48021
> URL: https://issues.apache.org/jira/browse/SPARK-48021
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 4.0.0
>Reporter: BingKun Pan
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48021) Add `--add-modules=jdk.incubator.vector` to `JavaModuleOptions`

2024-04-27 Thread BingKun Pan (Jira)
BingKun Pan created SPARK-48021:
---

 Summary: Add `--add-modules=jdk.incubator.vector` to 
`JavaModuleOptions`
 Key: SPARK-48021
 URL: https://issues.apache.org/jira/browse/SPARK-48021
 Project: Spark
  Issue Type: Improvement
  Components: Build
Affects Versions: 4.0.0
Reporter: BingKun Pan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-48012) SPJ: Support Transfrom Expressions for One Side Shuffle

2024-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-48012:
--

Assignee: Apache Spark

> SPJ: Support Transfrom Expressions for One Side Shuffle
> ---
>
> Key: SPARK-48012
> URL: https://issues.apache.org/jira/browse/SPARK-48012
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.3
>Reporter: Szehon Ho
>Assignee: Apache Spark
>Priority: Major
>  Labels: pull-request-available
>
> SPARK-41471 allowed Spark to shuffle just one side and still conduct SPJ, if 
> the other side is KeyGroupedPartitioning.  However, the support was just for 
> a KeyGroupedPartition without any partition transform (day, year, bucket).  
> It will be useful to add support for partition transform as well, as there 
> are many tables partitioned by those transforms.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47730) Support APP_ID and EXECUTOR_ID placeholder in labels

2024-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-47730:
--

Assignee: (was: Apache Spark)

> Support APP_ID and EXECUTOR_ID placeholder in labels
> 
>
> Key: SPARK-47730
> URL: https://issues.apache.org/jira/browse/SPARK-47730
> Project: Spark
>  Issue Type: Improvement
>  Components: k8s
>Affects Versions: 3.5.1
>Reporter: Xi Chen
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-48012) SPJ: Support Transfrom Expressions for One Side Shuffle

2024-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-48012:
--

Assignee: (was: Apache Spark)

> SPJ: Support Transfrom Expressions for One Side Shuffle
> ---
>
> Key: SPARK-48012
> URL: https://issues.apache.org/jira/browse/SPARK-48012
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.3
>Reporter: Szehon Ho
>Priority: Major
>  Labels: pull-request-available
>
> SPARK-41471 allowed Spark to shuffle just one side and still conduct SPJ, if 
> the other side is KeyGroupedPartitioning.  However, the support was just for 
> a KeyGroupedPartition without any partition transform (day, year, bucket).  
> It will be useful to add support for partition transform as well, as there 
> are many tables partitioned by those transforms.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47730) Support APP_ID and EXECUTOR_ID placeholder in labels

2024-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-47730:
--

Assignee: Apache Spark

> Support APP_ID and EXECUTOR_ID placeholder in labels
> 
>
> Key: SPARK-47730
> URL: https://issues.apache.org/jira/browse/SPARK-47730
> Project: Spark
>  Issue Type: Improvement
>  Components: k8s
>Affects Versions: 3.5.1
>Reporter: Xi Chen
>Assignee: Apache Spark
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47730) Support APP_ID and EXECUTOR_ID placeholder in labels

2024-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-47730:
--

Assignee: Apache Spark

> Support APP_ID and EXECUTOR_ID placeholder in labels
> 
>
> Key: SPARK-47730
> URL: https://issues.apache.org/jira/browse/SPARK-47730
> Project: Spark
>  Issue Type: Improvement
>  Components: k8s
>Affects Versions: 3.5.1
>Reporter: Xi Chen
>Assignee: Apache Spark
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47730) Support APP_ID and EXECUTOR_ID placeholder in labels

2024-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-47730:
--

Assignee: (was: Apache Spark)

> Support APP_ID and EXECUTOR_ID placeholder in labels
> 
>
> Key: SPARK-47730
> URL: https://issues.apache.org/jira/browse/SPARK-47730
> Project: Spark
>  Issue Type: Improvement
>  Components: k8s
>Affects Versions: 3.5.1
>Reporter: Xi Chen
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47730) Support APP_ID and EXECUTOR_ID placeholder in labels

2024-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-47730:
--

Assignee: Apache Spark

> Support APP_ID and EXECUTOR_ID placeholder in labels
> 
>
> Key: SPARK-47730
> URL: https://issues.apache.org/jira/browse/SPARK-47730
> Project: Spark
>  Issue Type: Improvement
>  Components: k8s
>Affects Versions: 3.5.1
>Reporter: Xi Chen
>Assignee: Apache Spark
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-48006) add SortOrder for window function which has no orderSpec

2024-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-48006:
--

Assignee: Apache Spark

> add SortOrder for window function which has no orderSpec
> 
>
> Key: SPARK-48006
> URL: https://issues.apache.org/jira/browse/SPARK-48006
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: guihuawen
>Assignee: Apache Spark
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> I am doing Hive SQL to switch to Spark SQL.
>  
> In Hive SQL
>  
> hive> explain select *,row_number() over (partition by day) rn from 
> testdb.zeropart_db;
> OK
> Explain
>  
> In Spark SQL
> spark-sql> explain select *,row_number() over (partition by age ) rn  from 
> testdb.zeropart_db;
> plan
> == Physical Plan ==
> org.apache.spark.sql.AnalysisException: Window function row_number() requires 
> window to be ordered, please add ORDER BY clause. For example SELECT 
> row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY 
> window_ordering) from table
> Time taken: 0.172 seconds, Fetched 1 row(s)
>  
> For better compatibility with migration. For better compatibility with 
> migration, new parameters are added to ensure compatibility with the same 
> behavior as Hive SQL
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47730) Support APP_ID and EXECUTOR_ID placeholder in labels

2024-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-47730:
--

Assignee: (was: Apache Spark)

> Support APP_ID and EXECUTOR_ID placeholder in labels
> 
>
> Key: SPARK-47730
> URL: https://issues.apache.org/jira/browse/SPARK-47730
> Project: Spark
>  Issue Type: Improvement
>  Components: k8s
>Affects Versions: 3.5.1
>Reporter: Xi Chen
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-48006) add SortOrder for window function which has no orderSpec

2024-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-48006:
--

Assignee: (was: Apache Spark)

> add SortOrder for window function which has no orderSpec
> 
>
> Key: SPARK-48006
> URL: https://issues.apache.org/jira/browse/SPARK-48006
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: guihuawen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> I am doing Hive SQL to switch to Spark SQL.
>  
> In Hive SQL
>  
> hive> explain select *,row_number() over (partition by day) rn from 
> testdb.zeropart_db;
> OK
> Explain
>  
> In Spark SQL
> spark-sql> explain select *,row_number() over (partition by age ) rn  from 
> testdb.zeropart_db;
> plan
> == Physical Plan ==
> org.apache.spark.sql.AnalysisException: Window function row_number() requires 
> window to be ordered, please add ORDER BY clause. For example SELECT 
> row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY 
> window_ordering) from table
> Time taken: 0.172 seconds, Fetched 1 row(s)
>  
> For better compatibility with migration. For better compatibility with 
> migration, new parameters are added to ensure compatibility with the same 
> behavior as Hive SQL
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48020) Pin 'pandas==2.2.2'

2024-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-48020:
---
Labels: pull-request-available  (was: )

> Pin 'pandas==2.2.2'
> ---
>
> Key: SPARK-48020
> URL: https://issues.apache.org/jira/browse/SPARK-48020
> Project: Spark
>  Issue Type: Test
>  Components: Project Infra, PySpark
>Affects Versions: 4.0.0
>Reporter: Ruifeng Zheng
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48020) Pin 'pandas==2.2.2'

2024-04-27 Thread Ruifeng Zheng (Jira)
Ruifeng Zheng created SPARK-48020:
-

 Summary: Pin 'pandas==2.2.2'
 Key: SPARK-48020
 URL: https://issues.apache.org/jira/browse/SPARK-48020
 Project: Spark
  Issue Type: Bug
  Components: Project Infra, PySpark
Affects Versions: 4.0.0
Reporter: Ruifeng Zheng






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48012) SPJ: Support Transfrom Expressions for One Side Shuffle

2024-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-48012:
---
Labels: pull-request-available  (was: )

> SPJ: Support Transfrom Expressions for One Side Shuffle
> ---
>
> Key: SPARK-48012
> URL: https://issues.apache.org/jira/browse/SPARK-48012
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.3
>Reporter: Szehon Ho
>Priority: Major
>  Labels: pull-request-available
>
> SPARK-41471 allowed Spark to shuffle just one side and still conduct SPJ, if 
> the other side is KeyGroupedPartitioning.  However, the support was just for 
> a KeyGroupedPartition without any partition transform (day, year, bucket).  
> It will be useful to add support for partition transform as well, as there 
> are many tables partitioned by those transforms.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org