chaoqin-li1123 commented on code in PR #38066:
URL: https://github.com/apache/spark/pull/38066#discussion_r985390460
##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -71,55 +72,57 @@
# Split the lines into words, retaining timestamps, each word become a
sessionId
events = lines.select(
explode(split(lines.value, " ")).alias("sessionId"),
- lines.timestamp.cast("long"),
+ lines.timestamp,
)
# Type of output records.
session_schema = StructType(
[
StructField("sessionId", StringType()),
StructField("count", LongType()),
- StructField("start", LongType()),
- StructField("end", LongType()),
+ StructField("start", TimestampType()),
+ StructField("end", TimestampType()),
]
)
# Type of group state.
# Omit the session id in the state since it is available as group key
session_state_schema = StructType(
[
StructField("count", LongType()),
- StructField("start", LongType()),
- StructField("end", LongType()),
+ StructField("start", TimestampType()),
+ StructField("end", TimestampType()),
]
)
def func(
- key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+ key: Any, pdfs: Iterable[pd.DataFrame], state: GroupState
) -> Iterable[pd.DataFrame]:
if state.hasTimedOut:
count, start, end = state.get
state.remove()
+ (session_id,) = key
yield pd.DataFrame(
{
- "sessionId": [key[0]],
+ "sessionId": [session_id],
"count": [count],
"start": [start],
"end": [end],
}
)
else:
- start = math.inf
- end = 0
- count = 0
+ pdf_iter = iter(pdfs)
Review Comment:
Now start and end are TimestampType, I don't know what should be reasonable
min and max values for then
##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -71,55 +72,57 @@
# Split the lines into words, retaining timestamps, each word become a
sessionId
events = lines.select(
explode(split(lines.value, " ")).alias("sessionId"),
- lines.timestamp.cast("long"),
+ lines.timestamp,
)
# Type of output records.
session_schema = StructType(
[
StructField("sessionId", StringType()),
StructField("count", LongType()),
- StructField("start", LongType()),
- StructField("end", LongType()),
+ StructField("start", TimestampType()),
+ StructField("end", TimestampType()),
]
)
# Type of group state.
# Omit the session id in the state since it is available as group key
session_state_schema = StructType(
[
StructField("count", LongType()),
- StructField("start", LongType()),
- StructField("end", LongType()),
+ StructField("start", TimestampType()),
+ StructField("end", TimestampType()),
]
)
def func(
- key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+ key: Any, pdfs: Iterable[pd.DataFrame], state: GroupState
) -> Iterable[pd.DataFrame]:
if state.hasTimedOut:
count, start, end = state.get
state.remove()
+ (session_id,) = key
yield pd.DataFrame(
{
- "sessionId": [key[0]],
+ "sessionId": [session_id],
"count": [count],
"start": [start],
"end": [end],
}
)
else:
- start = math.inf
- end = 0
- count = 0
+ pdf_iter = iter(pdfs)
Review Comment:
Now start and end are TimestampType, I don't know what should be reasonable
initial values for them.
##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -71,55 +72,57 @@
# Split the lines into words, retaining timestamps, each word become a
sessionId
events = lines.select(
explode(split(lines.value, " ")).alias("sessionId"),
- lines.timestamp.cast("long"),
+ lines.timestamp,
)
# Type of output records.
session_schema = StructType(
[
StructField("sessionId", StringType()),
StructField("count", LongType()),
- StructField("start", LongType()),
- StructField("end", LongType()),
+ StructField("start", TimestampType()),
+ StructField("end", TimestampType()),
]
)
# Type of group state.
# Omit the session id in the state since it is available as group key
session_state_schema = StructType(
[
StructField("count", LongType()),
- StructField("start", LongType()),
- StructField("end", LongType()),
+ StructField("start", TimestampType()),
+ StructField("end", TimestampType()),
]
)
def func(
- key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+ key: Any, pdfs: Iterable[pd.DataFrame], state: GroupState
) -> Iterable[pd.DataFrame]:
if state.hasTimedOut:
count, start, end = state.get
state.remove()
+ (session_id,) = key
yield pd.DataFrame(
{
- "sessionId": [key[0]],
+ "sessionId": [session_id],
"count": [count],
"start": [start],
"end": [end],
}
)
else:
- start = math.inf
- end = 0
- count = 0
+ pdf_iter = iter(pdfs)
Review Comment:
start = min(start, pdf["timestamp"].min())
end = max(end, pdf["timestamp"].max())
These statement expects a non-empty start and end value.
Also, can this callback ever be invoked with empty records dataframe?
##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -71,55 +72,57 @@
# Split the lines into words, retaining timestamps, each word become a
sessionId
events = lines.select(
explode(split(lines.value, " ")).alias("sessionId"),
- lines.timestamp.cast("long"),
+ lines.timestamp,
)
# Type of output records.
session_schema = StructType(
[
StructField("sessionId", StringType()),
StructField("count", LongType()),
- StructField("start", LongType()),
- StructField("end", LongType()),
+ StructField("start", TimestampType()),
+ StructField("end", TimestampType()),
]
)
# Type of group state.
# Omit the session id in the state since it is available as group key
session_state_schema = StructType(
[
StructField("count", LongType()),
- StructField("start", LongType()),
- StructField("end", LongType()),
+ StructField("start", TimestampType()),
+ StructField("end", TimestampType()),
]
)
def func(
- key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+ key: Any, pdfs: Iterable[pd.DataFrame], state: GroupState
) -> Iterable[pd.DataFrame]:
if state.hasTimedOut:
count, start, end = state.get
state.remove()
+ (session_id,) = key
yield pd.DataFrame(
{
- "sessionId": [key[0]],
+ "sessionId": [session_id],
"count": [count],
"start": [start],
"end": [end],
}
)
else:
- start = math.inf
- end = 0
- count = 0
+ pdf_iter = iter(pdfs)
Review Comment:
It seems that the pandas udf expects Iterable instead of Iterator as input
signature, so I can't change the input type annotation. And this conversion is
necessary for us to iterate the first element and then traverse the rest.
Technically we know pdfs is an iterator and the conversion is unnecessary.(I
tested it locally and it worked without conversion). But the linter mypy will
complain.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]