WweiL commented on code in PR #46921:
URL: https://github.com/apache/spark/pull/46921#discussion_r1633937962
##########
python/pyspark/sql/streaming/listener.py:
##########
@@ -500,6 +504,19 @@ def fromJson(cls, j: Dict[str, Any]) ->
"StreamingQueryProgress":
else {},
)
+ def __getitem__(self, key):
+ # Before Spark 4.0, StreamingQuery.lastProgress returns a dict, which
casts id and runId
+ # to string. But here they are UUID.
+ # To prevent breaking change, also cast them to string when accessed
with __getitem__.
+ if key == "id" or key == "runId":
+ return str(getattr(self, key))
+ else:
+ return getattr(self, key)
+
+ def __setitem__(self, key, value):
Review Comment:
The fear is backward compatibility, this is possible in current master
```
>>> q =
spark.readStream.format("rate").load().writeStream.format("noop").start()
24/06/10 16:10:35 WARN ResolveWriteToStream: Temporary checkpoint location
created which is deleted normally when the query didn't fail:
/private/var/folders/9k/pbxb4_690wv4smwhwbzwmqkw0000gp/T/temporary-709975db-23ed-4838-b9ae-93a7ffe59183.
If it's required to delete it under any circumstances, please set
spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to
know deleting temp checkpoint folder is best effort.
24/06/10 16:10:35 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is
not supported in streaming DataFrames/Datasets and will be disabled.
p =>>> p = q.lastProgress
>>> p
{'id': '44510846-29f8-4218-95cf-616efecadb05', 'runId':
'afcac0a7-424b-428b-948e-2c0fc21a43a2', 'name': None, 'timestamp':
'2024-06-10T23:10:38.257Z', 'batchId': 2, 'batchDuration': 215, 'numInputRows':
1, 'inputRowsPerSecond': 76.92307692307692, 'processedRowsPerSecond':
4.651162790697675, 'durationMs': {'addBatch': 30, 'commitOffsets': 82,
'getBatch': 0, 'latestOffset': 0, 'queryPlanning': 4, 'triggerExecution': 215,
'walCommit': 98}, 'stateOperators': [], 'sources': [{'description':
'RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default',
'startOffset': 1, 'endOffset': 2, 'latestOffset': 2, 'numInputRows': 1,
'inputRowsPerSecond': 76.92307692307692, 'processedRowsPerSecond':
4.651162790697675}], 'sink': {'description':
'org.apache.spark.sql.execution.datasources.noop.NoopTable$@67a2b2a4',
'numOutputRows': 1}}
>>> q.stop()
24/06/10 16:10:42 WARN DAGScheduler: Failed to cancel job group
afcac0a7-424b-428b-948e-2c0fc21a43a2. Cannot find active jobs for it.
24/06/10 16:10:42 WARN DAGScheduler: Failed to cancel job group
afcac0a7-424b-428b-948e-2c0fc21a43a2. Cannot find active jobs for it.
>>> p["sources"]
[{'description': 'RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
numPartitions=default', 'startOffset': 1, 'endOffset': 2, 'latestOffset': 2,
'numInputRows': 1, 'inputRowsPerSecond': 76.92307692307692,
'processedRowsPerSecond': 4.651162790697675}]
>>> p["id"]
'44510846-29f8-4218-95cf-616efecadb05'
>>> p["id"] = "aaaaaaa"
>>> p["id"]
'aaaaaaa'
```
This is not possible in Scala of course, but not sure if we should keep this
python specific behavior....
##########
python/pyspark/sql/streaming/listener.py:
##########
@@ -500,6 +504,19 @@ def fromJson(cls, j: Dict[str, Any]) ->
"StreamingQueryProgress":
else {},
)
+ def __getitem__(self, key):
+ # Before Spark 4.0, StreamingQuery.lastProgress returns a dict, which
casts id and runId
+ # to string. But here they are UUID.
+ # To prevent breaking change, also cast them to string when accessed
with __getitem__.
+ if key == "id" or key == "runId":
+ return str(getattr(self, key))
+ else:
+ return getattr(self, key)
+
+ def __setitem__(self, key, value):
Review Comment:
The fear is backward compatibility. This is possible in current master:
```
>>> q =
spark.readStream.format("rate").load().writeStream.format("noop").start()
24/06/10 16:10:35 WARN ResolveWriteToStream: Temporary checkpoint location
created which is deleted normally when the query didn't fail:
/private/var/folders/9k/pbxb4_690wv4smwhwbzwmqkw0000gp/T/temporary-709975db-23ed-4838-b9ae-93a7ffe59183.
If it's required to delete it under any circumstances, please set
spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to
know deleting temp checkpoint folder is best effort.
24/06/10 16:10:35 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is
not supported in streaming DataFrames/Datasets and will be disabled.
p =>>> p = q.lastProgress
>>> p
{'id': '44510846-29f8-4218-95cf-616efecadb05', 'runId':
'afcac0a7-424b-428b-948e-2c0fc21a43a2', 'name': None, 'timestamp':
'2024-06-10T23:10:38.257Z', 'batchId': 2, 'batchDuration': 215, 'numInputRows':
1, 'inputRowsPerSecond': 76.92307692307692, 'processedRowsPerSecond':
4.651162790697675, 'durationMs': {'addBatch': 30, 'commitOffsets': 82,
'getBatch': 0, 'latestOffset': 0, 'queryPlanning': 4, 'triggerExecution': 215,
'walCommit': 98}, 'stateOperators': [], 'sources': [{'description':
'RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default',
'startOffset': 1, 'endOffset': 2, 'latestOffset': 2, 'numInputRows': 1,
'inputRowsPerSecond': 76.92307692307692, 'processedRowsPerSecond':
4.651162790697675}], 'sink': {'description':
'org.apache.spark.sql.execution.datasources.noop.NoopTable$@67a2b2a4',
'numOutputRows': 1}}
>>> q.stop()
24/06/10 16:10:42 WARN DAGScheduler: Failed to cancel job group
afcac0a7-424b-428b-948e-2c0fc21a43a2. Cannot find active jobs for it.
24/06/10 16:10:42 WARN DAGScheduler: Failed to cancel job group
afcac0a7-424b-428b-948e-2c0fc21a43a2. Cannot find active jobs for it.
>>> p["sources"]
[{'description': 'RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
numPartitions=default', 'startOffset': 1, 'endOffset': 2, 'latestOffset': 2,
'numInputRows': 1, 'inputRowsPerSecond': 76.92307692307692,
'processedRowsPerSecond': 4.651162790697675}]
>>> p["id"]
'44510846-29f8-4218-95cf-616efecadb05'
>>> p["id"] = "aaaaaaa"
>>> p["id"]
'aaaaaaa'
```
This is not possible in Scala of course, but not sure if we should keep this
python specific behavior....
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]