HeartSaVioR commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1861444940
##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -558,14 +559,19 @@ def prepare_batch3(input_path):
def test_transform_with_state_in_pandas_event_time(self):
def check_results(batch_df, batch_id):
if batch_id == 0:
- assert set(batch_df.sort("id").collect()) == {Row(id="a",
timestamp="20")}
- elif batch_id == 1:
+ # check timer registered in the same batch is expired
Review Comment:
nit: let's comment on `watermark for late event` and `watermark for
eviction` per batch, to help verify the output. e.g. in batch_id == 1,
watermark for eviction is 10, but the watermark for late event is 0, hence 4 is
accepted. The value of timestamp in expired row will follow the value of
`watermark for eviction`, hence also helpful.
##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -1197,7 +1198,11 @@ def generate_data_batches(batches):
data_batches = generate_data_batches(_batches)
for k, g in groupby(data_batches, key=lambda x: x[0]):
- yield (k, g)
+ yield (TransformWithStateInPandasFuncMode.PROCESS_DATA, k, g)
Review Comment:
nit: looks like not consistent? Here we use tuple with explicit `()` and
below class we don't use `()`. Not a huge deal if linter does not complain, but
while we are here (linter is failing)...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]