Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/17361#discussion_r107304196
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
---
@@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
throwError("Commands like CreateTable*, AlterTable*, Show* are
not supported with " +
"streaming DataFrames/Datasets")
- // mapGroupsWithState: Allowed only when no aggregation + Update
output mode
- case m: FlatMapGroupsWithState if m.isStreaming &&
m.isMapGroupsWithState =>
- if (collectStreamingAggregates(plan).isEmpty) {
- if (outputMode != InternalOutputModes.Update) {
- throwError("mapGroupsWithState is not supported with " +
- s"$outputMode output mode on a streaming
DataFrame/Dataset")
- } else {
- // Allowed when no aggregation + Update output mode
- }
- } else {
- throwError("mapGroupsWithState is not supported with
aggregation " +
- "on a streaming DataFrame/Dataset")
- }
-
- // flatMapGroupsWithState without aggregation
- case m: FlatMapGroupsWithState
- if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
- m.outputMode match {
- case InternalOutputModes.Update =>
- if (outputMode != InternalOutputModes.Update) {
- throwError("flatMapGroupsWithState in update mode is not
supported with " +
+ // mapGroupsWithState and flatMapGroupsWithState
+ case m: FlatMapGroupsWithState if m.isStreaming =>
--- End diff --
Wow, this is getting complicated...
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]