HyukjinKwon commented on code in PR #38066:
URL: https://github.com/apache/spark/pull/38066#discussion_r985388652
##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -71,55 +72,57 @@
# Split the lines into words, retaining timestamps, each word become a
sessionId
events = lines.select(
explode(split(lines.value, " ")).alias("sessionId"),
- lines.timestamp.cast("long"),
+ lines.timestamp,
)
# Type of output records.
session_schema = StructType(
[
StructField("sessionId", StringType()),
StructField("count", LongType()),
- StructField("start", LongType()),
- StructField("end", LongType()),
+ StructField("start", TimestampType()),
+ StructField("end", TimestampType()),
]
)
# Type of group state.
# Omit the session id in the state since it is available as group key
session_state_schema = StructType(
[
StructField("count", LongType()),
- StructField("start", LongType()),
- StructField("end", LongType()),
+ StructField("start", TimestampType()),
+ StructField("end", TimestampType()),
]
)
def func(
- key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+ key: Any, pdfs: Iterable[pd.DataFrame], state: GroupState
) -> Iterable[pd.DataFrame]:
if state.hasTimedOut:
count, start, end = state.get
state.remove()
+ (session_id,) = key
yield pd.DataFrame(
{
- "sessionId": [key[0]],
+ "sessionId": [session_id],
"count": [count],
"start": [start],
"end": [end],
}
)
else:
- start = math.inf
- end = 0
- count = 0
+ pdf_iter = iter(pdfs)
Review Comment:
Hm, why should we do this? I think you can directly iterate `pdfs` over.
Also, I think the previous value initialization logic looked making sense.
This example now would fail if the function is executed with an empty input.
##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -71,55 +72,57 @@
# Split the lines into words, retaining timestamps, each word become a
sessionId
events = lines.select(
explode(split(lines.value, " ")).alias("sessionId"),
- lines.timestamp.cast("long"),
+ lines.timestamp,
)
# Type of output records.
session_schema = StructType(
[
StructField("sessionId", StringType()),
StructField("count", LongType()),
- StructField("start", LongType()),
- StructField("end", LongType()),
+ StructField("start", TimestampType()),
+ StructField("end", TimestampType()),
]
)
# Type of group state.
# Omit the session id in the state since it is available as group key
session_state_schema = StructType(
[
StructField("count", LongType()),
- StructField("start", LongType()),
- StructField("end", LongType()),
+ StructField("start", TimestampType()),
+ StructField("end", TimestampType()),
]
)
def func(
- key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+ key: Any, pdfs: Iterable[pd.DataFrame], state: GroupState
) -> Iterable[pd.DataFrame]:
if state.hasTimedOut:
count, start, end = state.get
state.remove()
+ (session_id,) = key
yield pd.DataFrame(
{
- "sessionId": [key[0]],
+ "sessionId": [session_id],
"count": [count],
"start": [start],
"end": [end],
}
)
else:
- start = math.inf
- end = 0
- count = 0
+ pdf_iter = iter(pdfs)
Review Comment:
I thin you can just saw it as `None`
##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -71,55 +72,57 @@
# Split the lines into words, retaining timestamps, each word become a
sessionId
events = lines.select(
explode(split(lines.value, " ")).alias("sessionId"),
- lines.timestamp.cast("long"),
+ lines.timestamp,
)
# Type of output records.
session_schema = StructType(
[
StructField("sessionId", StringType()),
StructField("count", LongType()),
- StructField("start", LongType()),
- StructField("end", LongType()),
+ StructField("start", TimestampType()),
+ StructField("end", TimestampType()),
]
)
# Type of group state.
# Omit the session id in the state since it is available as group key
session_state_schema = StructType(
[
StructField("count", LongType()),
- StructField("start", LongType()),
- StructField("end", LongType()),
+ StructField("start", TimestampType()),
+ StructField("end", TimestampType()),
]
)
def func(
- key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+ key: Any, pdfs: Iterable[pd.DataFrame], state: GroupState
) -> Iterable[pd.DataFrame]:
if state.hasTimedOut:
count, start, end = state.get
state.remove()
+ (session_id,) = key
yield pd.DataFrame(
{
- "sessionId": [key[0]],
+ "sessionId": [session_id],
"count": [count],
"start": [start],
"end": [end],
}
)
else:
- start = math.inf
- end = 0
- count = 0
+ pdf_iter = iter(pdfs)
Review Comment:
Hm, when timeout is reached, it goes to the above branch so technically it
won't be. Okay, LGTM but can we avoid `iter(pdfs)` conversion? It's already an
iterator.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]