chaoqin-li1123 commented on code in PR #38066:
URL: https://github.com/apache/spark/pull/38066#discussion_r985390460


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -71,55 +72,57 @@
     # Split the lines into words, retaining timestamps, each word become a 
sessionId
     events = lines.select(
         explode(split(lines.value, " ")).alias("sessionId"),
-        lines.timestamp.cast("long"),
+        lines.timestamp,
     )
 
     # Type of output records.
     session_schema = StructType(
         [
             StructField("sessionId", StringType()),
             StructField("count", LongType()),
-            StructField("start", LongType()),
-            StructField("end", LongType()),
+            StructField("start", TimestampType()),
+            StructField("end", TimestampType()),
         ]
     )
     # Type of group state.
     # Omit the session id in the state since it is available as group key
     session_state_schema = StructType(
         [
             StructField("count", LongType()),
-            StructField("start", LongType()),
-            StructField("end", LongType()),
+            StructField("start", TimestampType()),
+            StructField("end", TimestampType()),
         ]
     )
 
     def func(
-        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+        key: Any, pdfs: Iterable[pd.DataFrame], state: GroupState
     ) -> Iterable[pd.DataFrame]:
         if state.hasTimedOut:
             count, start, end = state.get
             state.remove()
+            (session_id,) = key
             yield pd.DataFrame(
                 {
-                    "sessionId": [key[0]],
+                    "sessionId": [session_id],
                     "count": [count],
                     "start": [start],
                     "end": [end],
                 }
             )
         else:
-            start = math.inf
-            end = 0
-            count = 0
+            pdf_iter = iter(pdfs)

Review Comment:
   Now start and end are TimestampType, I don't know what should be reasonable 
min and max values for then



##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -71,55 +72,57 @@
     # Split the lines into words, retaining timestamps, each word become a 
sessionId
     events = lines.select(
         explode(split(lines.value, " ")).alias("sessionId"),
-        lines.timestamp.cast("long"),
+        lines.timestamp,
     )
 
     # Type of output records.
     session_schema = StructType(
         [
             StructField("sessionId", StringType()),
             StructField("count", LongType()),
-            StructField("start", LongType()),
-            StructField("end", LongType()),
+            StructField("start", TimestampType()),
+            StructField("end", TimestampType()),
         ]
     )
     # Type of group state.
     # Omit the session id in the state since it is available as group key
     session_state_schema = StructType(
         [
             StructField("count", LongType()),
-            StructField("start", LongType()),
-            StructField("end", LongType()),
+            StructField("start", TimestampType()),
+            StructField("end", TimestampType()),
         ]
     )
 
     def func(
-        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+        key: Any, pdfs: Iterable[pd.DataFrame], state: GroupState
     ) -> Iterable[pd.DataFrame]:
         if state.hasTimedOut:
             count, start, end = state.get
             state.remove()
+            (session_id,) = key
             yield pd.DataFrame(
                 {
-                    "sessionId": [key[0]],
+                    "sessionId": [session_id],
                     "count": [count],
                     "start": [start],
                     "end": [end],
                 }
             )
         else:
-            start = math.inf
-            end = 0
-            count = 0
+            pdf_iter = iter(pdfs)

Review Comment:
   Now start and end are TimestampType, I don't know what should be reasonable 
initial values for them.



##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -71,55 +72,57 @@
     # Split the lines into words, retaining timestamps, each word become a 
sessionId
     events = lines.select(
         explode(split(lines.value, " ")).alias("sessionId"),
-        lines.timestamp.cast("long"),
+        lines.timestamp,
     )
 
     # Type of output records.
     session_schema = StructType(
         [
             StructField("sessionId", StringType()),
             StructField("count", LongType()),
-            StructField("start", LongType()),
-            StructField("end", LongType()),
+            StructField("start", TimestampType()),
+            StructField("end", TimestampType()),
         ]
     )
     # Type of group state.
     # Omit the session id in the state since it is available as group key
     session_state_schema = StructType(
         [
             StructField("count", LongType()),
-            StructField("start", LongType()),
-            StructField("end", LongType()),
+            StructField("start", TimestampType()),
+            StructField("end", TimestampType()),
         ]
     )
 
     def func(
-        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+        key: Any, pdfs: Iterable[pd.DataFrame], state: GroupState
     ) -> Iterable[pd.DataFrame]:
         if state.hasTimedOut:
             count, start, end = state.get
             state.remove()
+            (session_id,) = key
             yield pd.DataFrame(
                 {
-                    "sessionId": [key[0]],
+                    "sessionId": [session_id],
                     "count": [count],
                     "start": [start],
                     "end": [end],
                 }
             )
         else:
-            start = math.inf
-            end = 0
-            count = 0
+            pdf_iter = iter(pdfs)

Review Comment:
     start = min(start, pdf["timestamp"].min())
     end = max(end, pdf["timestamp"].max())
   These statement expects a non-empty start and end value.
   Also, can this callback ever be invoked with empty records dataframe?



##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -71,55 +72,57 @@
     # Split the lines into words, retaining timestamps, each word become a 
sessionId
     events = lines.select(
         explode(split(lines.value, " ")).alias("sessionId"),
-        lines.timestamp.cast("long"),
+        lines.timestamp,
     )
 
     # Type of output records.
     session_schema = StructType(
         [
             StructField("sessionId", StringType()),
             StructField("count", LongType()),
-            StructField("start", LongType()),
-            StructField("end", LongType()),
+            StructField("start", TimestampType()),
+            StructField("end", TimestampType()),
         ]
     )
     # Type of group state.
     # Omit the session id in the state since it is available as group key
     session_state_schema = StructType(
         [
             StructField("count", LongType()),
-            StructField("start", LongType()),
-            StructField("end", LongType()),
+            StructField("start", TimestampType()),
+            StructField("end", TimestampType()),
         ]
     )
 
     def func(
-        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+        key: Any, pdfs: Iterable[pd.DataFrame], state: GroupState
     ) -> Iterable[pd.DataFrame]:
         if state.hasTimedOut:
             count, start, end = state.get
             state.remove()
+            (session_id,) = key
             yield pd.DataFrame(
                 {
-                    "sessionId": [key[0]],
+                    "sessionId": [session_id],
                     "count": [count],
                     "start": [start],
                     "end": [end],
                 }
             )
         else:
-            start = math.inf
-            end = 0
-            count = 0
+            pdf_iter = iter(pdfs)

Review Comment:
   It seems that the pandas udf expects Iterable instead of Iterator as input 
signature, so I can't change the input type annotation. And this conversion is 
necessary for us to iterate the first element and then traverse the rest. 
Technically we know pdfs is an iterator and the conversion is unnecessary.(I 
tested it locally and it worked without conversion). But the linter mypy will 
complain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to