WweiL commented on code in PR #46921:
URL: https://github.com/apache/spark/pull/46921#discussion_r1633937962


##########
python/pyspark/sql/streaming/listener.py:
##########
@@ -500,6 +504,19 @@ def fromJson(cls, j: Dict[str, Any]) -> 
"StreamingQueryProgress":
             else {},
         )
 
+    def __getitem__(self, key):
+        # Before Spark 4.0, StreamingQuery.lastProgress returns a dict, which 
casts id and runId
+        # to string. But here they are UUID.
+        # To prevent breaking change, also cast them to string when accessed 
with __getitem__.
+        if key == "id" or key == "runId":
+            return str(getattr(self, key))
+        else:
+            return getattr(self, key)
+
+    def __setitem__(self, key, value):

Review Comment:
   The fear is backward compatibility, this is possible in current master
   ```
   >>> q = 
spark.readStream.format("rate").load().writeStream.format("noop").start()
   24/06/10 16:10:35 WARN ResolveWriteToStream: Temporary checkpoint location 
created which is deleted normally when the query didn't fail: 
/private/var/folders/9k/pbxb4_690wv4smwhwbzwmqkw0000gp/T/temporary-709975db-23ed-4838-b9ae-93a7ffe59183.
 If it's required to delete it under any circumstances, please set 
spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to 
know deleting temp checkpoint folder is best effort.
   24/06/10 16:10:35 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is 
not supported in streaming DataFrames/Datasets and will be disabled.
   p =>>> p = q.lastProgress
   >>> p
   {'id': '44510846-29f8-4218-95cf-616efecadb05', 'runId': 
'afcac0a7-424b-428b-948e-2c0fc21a43a2', 'name': None, 'timestamp': 
'2024-06-10T23:10:38.257Z', 'batchId': 2, 'batchDuration': 215, 'numInputRows': 
1, 'inputRowsPerSecond': 76.92307692307692, 'processedRowsPerSecond': 
4.651162790697675, 'durationMs': {'addBatch': 30, 'commitOffsets': 82, 
'getBatch': 0, 'latestOffset': 0, 'queryPlanning': 4, 'triggerExecution': 215, 
'walCommit': 98}, 'stateOperators': [], 'sources': [{'description': 
'RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default', 
'startOffset': 1, 'endOffset': 2, 'latestOffset': 2, 'numInputRows': 1, 
'inputRowsPerSecond': 76.92307692307692, 'processedRowsPerSecond': 
4.651162790697675}], 'sink': {'description': 
'org.apache.spark.sql.execution.datasources.noop.NoopTable$@67a2b2a4', 
'numOutputRows': 1}}
   >>> q.stop()
   24/06/10 16:10:42 WARN DAGScheduler: Failed to cancel job group 
afcac0a7-424b-428b-948e-2c0fc21a43a2. Cannot find active jobs for it.
   24/06/10 16:10:42 WARN DAGScheduler: Failed to cancel job group 
afcac0a7-424b-428b-948e-2c0fc21a43a2. Cannot find active jobs for it.
   >>> p["sources"]
   [{'description': 'RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, 
numPartitions=default', 'startOffset': 1, 'endOffset': 2, 'latestOffset': 2, 
'numInputRows': 1, 'inputRowsPerSecond': 76.92307692307692, 
'processedRowsPerSecond': 4.651162790697675}]
   >>> p["id"]
   '44510846-29f8-4218-95cf-616efecadb05'
   >>> p["id"] = "aaaaaaa"
   >>> p["id"]
   'aaaaaaa'
   ```
   
   This is not possible in Scala of course, but not sure if we should keep this 
python specific behavior....



##########
python/pyspark/sql/streaming/listener.py:
##########
@@ -500,6 +504,19 @@ def fromJson(cls, j: Dict[str, Any]) -> 
"StreamingQueryProgress":
             else {},
         )
 
+    def __getitem__(self, key):
+        # Before Spark 4.0, StreamingQuery.lastProgress returns a dict, which 
casts id and runId
+        # to string. But here they are UUID.
+        # To prevent breaking change, also cast them to string when accessed 
with __getitem__.
+        if key == "id" or key == "runId":
+            return str(getattr(self, key))
+        else:
+            return getattr(self, key)
+
+    def __setitem__(self, key, value):

Review Comment:
   The fear is backward compatibility. This is possible in current master:
   ```
   >>> q = 
spark.readStream.format("rate").load().writeStream.format("noop").start()
   24/06/10 16:10:35 WARN ResolveWriteToStream: Temporary checkpoint location 
created which is deleted normally when the query didn't fail: 
/private/var/folders/9k/pbxb4_690wv4smwhwbzwmqkw0000gp/T/temporary-709975db-23ed-4838-b9ae-93a7ffe59183.
 If it's required to delete it under any circumstances, please set 
spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to 
know deleting temp checkpoint folder is best effort.
   24/06/10 16:10:35 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is 
not supported in streaming DataFrames/Datasets and will be disabled.
   p =>>> p = q.lastProgress
   >>> p
   {'id': '44510846-29f8-4218-95cf-616efecadb05', 'runId': 
'afcac0a7-424b-428b-948e-2c0fc21a43a2', 'name': None, 'timestamp': 
'2024-06-10T23:10:38.257Z', 'batchId': 2, 'batchDuration': 215, 'numInputRows': 
1, 'inputRowsPerSecond': 76.92307692307692, 'processedRowsPerSecond': 
4.651162790697675, 'durationMs': {'addBatch': 30, 'commitOffsets': 82, 
'getBatch': 0, 'latestOffset': 0, 'queryPlanning': 4, 'triggerExecution': 215, 
'walCommit': 98}, 'stateOperators': [], 'sources': [{'description': 
'RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default', 
'startOffset': 1, 'endOffset': 2, 'latestOffset': 2, 'numInputRows': 1, 
'inputRowsPerSecond': 76.92307692307692, 'processedRowsPerSecond': 
4.651162790697675}], 'sink': {'description': 
'org.apache.spark.sql.execution.datasources.noop.NoopTable$@67a2b2a4', 
'numOutputRows': 1}}
   >>> q.stop()
   24/06/10 16:10:42 WARN DAGScheduler: Failed to cancel job group 
afcac0a7-424b-428b-948e-2c0fc21a43a2. Cannot find active jobs for it.
   24/06/10 16:10:42 WARN DAGScheduler: Failed to cancel job group 
afcac0a7-424b-428b-948e-2c0fc21a43a2. Cannot find active jobs for it.
   >>> p["sources"]
   [{'description': 'RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, 
numPartitions=default', 'startOffset': 1, 'endOffset': 2, 'latestOffset': 2, 
'numInputRows': 1, 'inputRowsPerSecond': 76.92307692307692, 
'processedRowsPerSecond': 4.651162790697675}]
   >>> p["id"]
   '44510846-29f8-4218-95cf-616efecadb05'
   >>> p["id"] = "aaaaaaa"
   >>> p["id"]
   'aaaaaaa'
   ```
   
   This is not possible in Scala of course, but not sure if we should keep this 
python specific behavior....



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to