Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/20647#discussion_r169556960
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
---
@@ -107,17 +106,24 @@ case class DataSourceV2Relation(
}
/**
- * A specialization of DataSourceV2Relation with the streaming bit set to
true. Otherwise identical
- * to the non-streaming relation.
+ * A specialization of [[DataSourceV2Relation]] with the streaming bit set
to true.
+ *
+ * Note that, this plan has a mutable reader, so Spark won't apply
operator push-down for this plan,
+ * to avoid making the plan mutable. We should consolidate this plan and
[[DataSourceV2Relation]]
+ * after we figure out how to apply operator push-down for streaming data
sources.
--- End diff --
Currently the streaming execution creates the reader once. The reader is
mutable and contains 2 kinds of states:
1. operator push-down states, e.g. the filters being pushed down.
2. streaming related states, like offsets, kafka connection, etc.
For continues mode, it's fine. We create the reader, set offsets, construct
the plan, get the physical plan, and process. We mutate the reader states at
the beginning and never mutate it again.
For micro-batch mode, we have a problem. We create the reader at the
beginning, set reader offset, construct the plan and get the physical plan for
every batch. This means we apply operator push-down to this reader many times,
and data source v2 doesn't define what the behavior should be for this case.
Thus we can't apply operator push-down for streaming data sources.
@marmbrus @tdas @zsxwing @jose-torres I have 2 proposals to support
operator push down for streaming relation:
1. Introduce a `reset` API to `DataSourceReader` to clear out the operator
push-down states. Then we can call `reset` for every micro-batch and safely
apply operator pushdown.
2. Do plan analyzing/optimizing/planning only once for micro-batch mode.
Theoretically it's not good, as different micro-batch may have different
statistics and the optimal physical plan is different, we should rerun the
planner for each batch. The benefit is, plan analyzing/optimizing/planning may
be costly, doing it once can mitigate the cost. Also adaptive execution can
help so it's not that bad to reuse the same physical plan.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]