HeartSaVioR commented on code in PR #38683:
URL: https://github.com/apache/spark/pull/38683#discussion_r1027509435
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala:
##########
@@ -600,7 +600,7 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
val df2 = spark.read.format("json")
.load(dir.getCanonicalPath + "/target/new-streaming-data-join")
// Verify self-join results
- assert(streamQuery2.lastProgress.numInputRows == 4L)
+ assert(streamQuery2.lastProgress.numInputRows == 2L)
Review Comment:
Off-topic: this is very interesting. Looks like fixing this "enables"
ReusedExchange, which somehow makes ProgressReporter pick up the metric from
the single leaf node instead of two.
> Before the fix
```
== Parsed Logical Plan ==
WriteToMicroBatchDataSourceV1
FileSink[/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/new-streaming-data-join],
77baa2ac-cc0b-4e01-94ff-ec20c98eb29b,
[checkpointLocation=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/checkpoint_join,
path=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/new-streaming-data-join],
Append, 0
+- Project [name#2339, age#2340, info#2341, _metadata#2345]
+- Join Inner, ((((name#2339 = name#2504) AND (age#2340 = age#2505)) AND
(info#2341 = info#2506)) AND (_metadata#2345 = _metadata#2507))
:- Project [name#2339, age#2340, info#2341, _metadata#2345]
: +- Project [_metadata#2345, name#2339, age#2340, info#2341,
_metadata#2345]
: +- Project [name#2517 AS name#2339, age#2518 AS age#2340,
info#2519 AS info#2341, _metadata#2529 AS _metadata#2345]
: +- Relation [name#2517,age#2518,info#2519,_metadata#2529] json
+- Project [name#2504, age#2505, info#2506, _metadata#2507]
+- Project [_metadata#2507, name#2504, age#2505, info#2506,
_metadata#2507]
+- Project [name#2523 AS name#2504, age#2524 AS age#2505,
info#2525 AS info#2506, _metadata#2530 AS _metadata#2507]
+- Relation [name#2523,age#2524,info#2525,_metadata#2530] json
== Analyzed Logical Plan ==
name: string, age: int, info: struct<id:bigint,university:string>,
_metadata:
struct<file_path:string,file_name:string,file_size:bigint,file_modification_time:timestamp>
WriteToMicroBatchDataSourceV1
FileSink[/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/new-streaming-data-join],
77baa2ac-cc0b-4e01-94ff-ec20c98eb29b,
[checkpointLocation=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/checkpoint_join,
path=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/new-streaming-data-join],
Append, 0
+- Project [name#2339, age#2340, info#2341, _metadata#2345]
+- Join Inner, ((((name#2339 = name#2504) AND (age#2340 = age#2505)) AND
(info#2341 = info#2506)) AND (_metadata#2345 = _metadata#2507))
:- Project [name#2339, age#2340, info#2341, _metadata#2345]
: +- Project [_metadata#2345, name#2339, age#2340, info#2341,
_metadata#2345]
: +- Project [name#2517 AS name#2339, age#2518 AS age#2340,
info#2519 AS info#2341, _metadata#2529 AS _metadata#2345]
: +- Relation [name#2517,age#2518,info#2519,_metadata#2529] json
+- Project [name#2504, age#2505, info#2506, _metadata#2507]
+- Project [_metadata#2507, name#2504, age#2505, info#2506,
_metadata#2507]
+- Project [name#2523 AS name#2504, age#2524 AS age#2505,
info#2525 AS info#2506, _metadata#2530 AS _metadata#2507]
+- Relation [name#2523,age#2524,info#2525,_metadata#2530] json
== Optimized Logical Plan ==
Project [name#2517, age#2518, info#2519, _metadata#2529]
+- Join Inner, ((((name#2517 = name#2523) AND (age#2518 = age#2524)) AND
(info#2519 = info#2525)) AND (_metadata#2529 = _metadata#2530))
:- Filter (((isnotnull(name#2517) AND isnotnull(age#2518)) AND
isnotnull(info#2519)) AND isnotnull(_metadata#2529))
: +- Relation [name#2517,age#2518,info#2519,_metadata#2529] json
+- Filter (((isnotnull(name#2523) AND isnotnull(age#2524)) AND
isnotnull(info#2525)) AND isnotnull(_metadata#2530))
+- Relation [name#2523,age#2524,info#2525,_metadata#2530] json
== Physical Plan ==
*(3) Project [name#2517, age#2518, info#2519, _metadata#2529]
+- StreamingSymmetricHashJoin [name#2517, age#2518, info#2519,
_metadata#2529], [name#2523, age#2524, info#2525, _metadata#2530], Inner,
condition = [ leftOnly = null, rightOnly = null, both = null, full = null ],
state info [ checkpoint =
file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/checkpoint_join/state,
runId = b3233731-bee2-478f-9774-3322b2f88110, opId = 0, ver = 0, numPartitions
= 5], 0, 0, state cleanup [ left = null, right = null ], 2
:- Exchange hashpartitioning(name#2517, age#2518, info#2519,
_metadata#2529, 5), ENSURE_REQUIREMENTS, [plan_id=2637]
: +- *(1) Filter (((isnotnull(name#2517) AND isnotnull(age#2518)) AND
isnotnull(info#2519)) AND isnotnull(_metadata#2529))
: +- *(1) Project [name#2517, age#2518, info#2519,
named_struct(file_path, file_path#2533, file_name, file_name#2534, file_size,
file_size#2535L, file_modification_time, file_modification_time#2536) AS
_metadata#2529]
: +- FileScan json
[name#2517,age#2518,info#2519,file_path#2533,file_name#2534,file_size#2535L,file_modification_time#2536]
Batched: false, DataFilters: [isnotnull(name#2517), isnotnull(age#2518),
isnotnull(info#2519), isnotnull(_metadata#2529)], Format: JSON, Location:
InMemoryFileIndex(1
paths)[file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b...,
PartitionFilters: [], PushedFilters: [IsNotNull(name), IsNotNull(age),
IsNotNull(info)], ReadSchema:
struct<name:string,age:int,info:struct<id:bigint,university:string>>
+- Exchange hashpartitioning(name#2523, age#2524, info#2525,
_metadata#2530, 5), ENSURE_REQUIREMENTS, [plan_id=2642]
+- *(2) Filter (((isnotnull(name#2523) AND isnotnull(age#2524)) AND
isnotnull(info#2525)) AND isnotnull(_metadata#2530))
+- *(2) Project [name#2523, age#2524, info#2525,
named_struct(file_path, file_path#2537, file_name, file_name#2538, file_size,
file_size#2539L, file_modification_time, file_modification_time#2540) AS
_metadata#2530]
+- FileScan json
[name#2523,age#2524,info#2525,file_path#2537,file_name#2538,file_size#2539L,file_modification_time#2540]
Batched: false, DataFilters: [isnotnull(name#2523), isnotnull(age#2524),
isnotnull(info#2525), isnotnull(_metadata#2530)], Format: JSON, Location:
InMemoryFileIndex(1
paths)[file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b...,
PartitionFilters: [], PushedFilters: [IsNotNull(name), IsNotNull(age),
IsNotNull(info)], ReadSchema:
struct<name:string,age:int,info:struct<id:bigint,university:string>>
```
> After the fix
```
== Parsed Logical Plan ==
WriteToMicroBatchDataSourceV1
FileSink[/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/new-streaming-data-join],
d8c57232-267e-436b-ad82-4cf8b7f4849b,
[checkpointLocation=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/checkpoint_join,
path=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/new-streaming-data-join],
Append, 0
+- Project [name#2339, age#2340, info#2341, _metadata#2345]
+- Join Inner, ((((name#2339 = name#2504) AND (age#2340 = age#2505)) AND
(info#2341 = info#2506)) AND (_metadata#2345 = _metadata#2507))
:- Project [name#2339, age#2340, info#2341, _metadata#2345]
: +- Project [_metadata#2345, name#2339, age#2340, info#2341,
_metadata#2345]
: +- Project [name#2523 AS name#2339, age#2524 AS age#2340,
info#2525 AS info#2341, _metadata#2529 AS _metadata#2345]
: +- Relation [name#2523,age#2524,info#2525,_metadata#2529] json
+- Project [name#2504, age#2505, info#2506, _metadata#2507]
+- Project [_metadata#2507, name#2504, age#2505, info#2506,
_metadata#2507]
+- Project [name#2517 AS name#2504, age#2518 AS age#2505,
info#2519 AS info#2506, _metadata#2530 AS _metadata#2507]
+- Relation [name#2517,age#2518,info#2519,_metadata#2530] json
== Analyzed Logical Plan ==
name: string, age: int, info: struct<id:bigint,university:string>,
_metadata:
struct<file_path:string,file_name:string,file_size:bigint,file_modification_time:timestamp>
WriteToMicroBatchDataSourceV1
FileSink[/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/new-streaming-data-join],
d8c57232-267e-436b-ad82-4cf8b7f4849b,
[checkpointLocation=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/checkpoint_join,
path=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/new-streaming-data-join],
Append, 0
+- Project [name#2339, age#2340, info#2341, _metadata#2345]
+- Join Inner, ((((name#2339 = name#2504) AND (age#2340 = age#2505)) AND
(info#2341 = info#2506)) AND (_metadata#2345 = _metadata#2507))
:- Project [name#2339, age#2340, info#2341, _metadata#2345]
: +- Project [_metadata#2345, name#2339, age#2340, info#2341,
_metadata#2345]
: +- Project [name#2523 AS name#2339, age#2524 AS age#2340,
info#2525 AS info#2341, _metadata#2529 AS _metadata#2345]
: +- Relation [name#2523,age#2524,info#2525,_metadata#2529] json
+- Project [name#2504, age#2505, info#2506, _metadata#2507]
+- Project [_metadata#2507, name#2504, age#2505, info#2506,
_metadata#2507]
+- Project [name#2517 AS name#2504, age#2518 AS age#2505,
info#2519 AS info#2506, _metadata#2530 AS _metadata#2507]
+- Relation [name#2517,age#2518,info#2519,_metadata#2530] json
== Optimized Logical Plan ==
Project [name#2523, age#2524, info#2525, _metadata#2529]
+- Join Inner, ((((name#2523 = name#2517) AND (age#2524 = age#2518)) AND
(info#2525 = info#2519)) AND (_metadata#2529 = _metadata#2530))
:- Filter ((isnotnull(name#2523) AND isnotnull(age#2524)) AND
isnotnull(info#2525))
: +- Relation [name#2523,age#2524,info#2525,_metadata#2529] json
+- Filter ((isnotnull(name#2517) AND isnotnull(age#2518)) AND
isnotnull(info#2519))
+- Relation [name#2517,age#2518,info#2519,_metadata#2530] json
== Physical Plan ==
*(3) Project [name#2523, age#2524, info#2525, _metadata#2529]
+- StreamingSymmetricHashJoin [name#2523, age#2524, info#2525,
_metadata#2529], [name#2517, age#2518, info#2519, _metadata#2530], Inner,
condition = [ leftOnly = null, rightOnly = null, both = null, full = null ],
state info [ checkpoint =
file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/checkpoint_join/state,
runId = 649e748e-fc6d-42c0-9acd-babc7809c621, opId = 0, ver = 0, numPartitions
= 5], 0, 0, state cleanup [ left = null, right = null ], 2
:- Exchange hashpartitioning(name#2523, age#2524, info#2525,
_metadata#2529, 5), ENSURE_REQUIREMENTS, [plan_id=2637]
: +- *(1) Filter ((isnotnull(name#2523) AND isnotnull(age#2524)) AND
isnotnull(info#2525))
: +- *(1) Project [name#2523, age#2524, info#2525,
knownnotnull(named_struct(file_path, file_path#2533, file_name, file_name#2534,
file_size, file_size#2535L, file_modification_time,
file_modification_time#2536)) AS _metadata#2529]
: +- FileScan json
[name#2523,age#2524,info#2525,file_path#2533,file_name#2534,file_size#2535L,file_modification_time#2536]
Batched: false, DataFilters: [isnotnull(name#2523), isnotnull(age#2524),
isnotnull(info#2525)], Format: JSON, Location: InMemoryFileIndex(1
paths)[file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a...,
PartitionFilters: [], PushedFilters: [IsNotNull(name), IsNotNull(age),
IsNotNull(info)], ReadSchema:
struct<name:string,age:int,info:struct<id:bigint,university:string>>
+- ReusedExchange [name#2517, age#2518, info#2519, _metadata#2530],
Exchange hashpartitioning(name#2523, age#2524, info#2525, _metadata#2529, 5),
ENSURE_REQUIREMENTS, [plan_id=2637]
```
This is definitely an "improvement", but it also shows us the way we collect
metrics with DSv1 in microbatch can be also affected by physical planning along
with optimization as well. It has been a sort of fragile.
Anyway, even if this happens with DSv2, the number of input rows would have
been counted once, so I'd consider this as "correct".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]