HeartSaVioR commented on a change in pull request #30789:
URL: https://github.com/apache/spark/pull/30789#discussion_r544685998
##########
File path: docs/structured-streaming-programming-guide.md
##########
@@ -1689,6 +1689,25 @@ hence the number is not same as the number of original
input rows. You'd like to
There's a known workaround: split your streaming query into multiple queries
per stateful operator, and ensure
end-to-end exactly once per query. Ensuring end-to-end exactly once for the
last query is optional.
+### State Store and task locality
+
+The stateful operations store states for events in state stores of executors.
State stores occupy resources such as memory and disk space to store the states.
+So it is more efficient to keep a state store provider running in the same
executor across different streaming batches.
+Changing the location of a state store provider requires loading from
checkpointed states from HDFS in the new executor.
Review comment:
Mentioning HDFS sounds slightly incorrect - even in built-in state store
(named HDFS) you can store the checkpoint to the external storages including
object stores.
Probably `loading from checkpoint` would just work - if it doesn't seem to
emphasize the overhead, it's OK to explicitly mention the overhead, like `the
overhead of loading state from checkpoint depends on the external storage and
the size of the state, which tends to hurt the latency of micro-batch run`.
##########
File path: docs/structured-streaming-programming-guide.md
##########
@@ -1689,6 +1689,25 @@ hence the number is not same as the number of original
input rows. You'd like to
There's a known workaround: split your streaming query into multiple queries
per stateful operator, and ensure
end-to-end exactly once per query. Ensuring end-to-end exactly once for the
last query is optional.
+### State Store and task locality
+
+The stateful operations store states for events in state stores of executors.
State stores occupy resources such as memory and disk space to store the states.
+So it is more efficient to keep a state store provider running in the same
executor across different streaming batches.
Review comment:
Probably explicitly mentioning `microbatches`, or if we still concern
about continuous processing, then `during the lifetime of the query`?
##########
File path: docs/structured-streaming-programming-guide.md
##########
@@ -1689,6 +1689,25 @@ hence the number is not same as the number of original
input rows. You'd like to
There's a known workaround: split your streaming query into multiple queries
per stateful operator, and ensure
end-to-end exactly once per query. Ensuring end-to-end exactly once for the
last query is optional.
+### State Store and task locality
+
+The stateful operations store states for events in state stores of executors.
State stores occupy resources such as memory and disk space to store the states.
+So it is more efficient to keep a state store provider running in the same
executor across different streaming batches.
+Changing the location of a state store provider requires loading from
checkpointed states from HDFS in the new executor.
+
+The stateful operations in Structured Streaming queries rely on the preferred
location feature of Spark's RDD to run the state store provider on the same
executor.
+However, generally the preferred location is not a hard requirement and it is
still possible that Spark schedules tasks to the executors other than the
preferred ones.
+
+In this case, Spark will load state store providers from checkpointed states
on HDFS to new executors. The state store providers run in the previous batch
will not be unloaded immediately.
+If in the next batch the corresponding state store provider is scheduled on
this executor again, it could reuse the previous states and save the time of
loading checkpointed states.
+Spark runs a maintenance task which checks and unloads the state store
providers that are inactive on the executors.
+
+For some use cases such as processing very large state data, loading new state
store providers from checkpointed states can be very time-consuming and
inefficient.
+By changing the Spark configurations related to task scheduling, for example
`spark.locality.wait`, users can configure Spark how long to wait to launch a
data-local task.
+For stateful operations in Structured Streaming, it can be used to let state
store providers running on the same executors across batches. Specially, users
can check the state store metrics
Review comment:
The metric is only provided in built-in HDFS state store provider, so
you may want to mention this explicitly, like `Specifically for built-in HDFS
state store provider,` or just `for built-in HDFS state store provider,`.
##########
File path: docs/structured-streaming-programming-guide.md
##########
@@ -1689,6 +1689,25 @@ hence the number is not same as the number of original
input rows. You'd like to
There's a known workaround: split your streaming query into multiple queries
per stateful operator, and ensure
end-to-end exactly once per query. Ensuring end-to-end exactly once for the
last query is optional.
+### State Store and task locality
+
+The stateful operations store states for events in state stores of executors.
State stores occupy resources such as memory and disk space to store the states.
+So it is more efficient to keep a state store provider running in the same
executor across different streaming batches.
+Changing the location of a state store provider requires loading from
checkpointed states from HDFS in the new executor.
+
+The stateful operations in Structured Streaming queries rely on the preferred
location feature of Spark's RDD to run the state store provider on the same
executor.
+However, generally the preferred location is not a hard requirement and it is
still possible that Spark schedules tasks to the executors other than the
preferred ones.
+
+In this case, Spark will load state store providers from checkpointed states
on HDFS to new executors. The state store providers run in the previous batch
will not be unloaded immediately.
+If in the next batch the corresponding state store provider is scheduled on
this executor again, it could reuse the previous states and save the time of
loading checkpointed states.
+Spark runs a maintenance task which checks and unloads the state store
providers that are inactive on the executors.
+
+For some use cases such as processing very large state data, loading new state
store providers from checkpointed states can be very time-consuming and
inefficient.
Review comment:
Just 2 cents, probably we'd want to mention this near the earliest to
explain rationalization about this section (`State Store and task locality`)
before explaining details. Without this, end users might not notice why it's
important to understand about the details, and simply ignore the section.
##########
File path: docs/structured-streaming-programming-guide.md
##########
@@ -1689,6 +1689,25 @@ hence the number is not same as the number of original
input rows. You'd like to
There's a known workaround: split your streaming query into multiple queries
per stateful operator, and ensure
end-to-end exactly once per query. Ensuring end-to-end exactly once for the
last query is optional.
+### State Store and task locality
+
+The stateful operations store states for events in state stores of executors.
State stores occupy resources such as memory and disk space to store the states.
+So it is more efficient to keep a state store provider running in the same
executor across different streaming batches.
+Changing the location of a state store provider requires loading from
checkpointed states from HDFS in the new executor.
+
+The stateful operations in Structured Streaming queries rely on the preferred
location feature of Spark's RDD to run the state store provider on the same
executor.
+However, generally the preferred location is not a hard requirement and it is
still possible that Spark schedules tasks to the executors other than the
preferred ones.
+
+In this case, Spark will load state store providers from checkpointed states
on HDFS to new executors. The state store providers run in the previous batch
will not be unloaded immediately.
+If in the next batch the corresponding state store provider is scheduled on
this executor again, it could reuse the previous states and save the time of
loading checkpointed states.
+Spark runs a maintenance task which checks and unloads the state store
providers that are inactive on the executors.
+
+For some use cases such as processing very large state data, loading new state
store providers from checkpointed states can be very time-consuming and
inefficient.
+By changing the Spark configurations related to task scheduling, for example
`spark.locality.wait`, users can configure Spark how long to wait to launch a
data-local task.
+For stateful operations in Structured Streaming, it can be used to let state
store providers running on the same executors across batches. Specially, users
can check the state store metrics
+such as `loadedMapCacheHitCount` and `loadedMapCacheMissCount`. Ideally, it is
best if cache missing count is zero that means Spark won't spend time on
loading checkpointed state.
+User can increase Spark locality waitting configurations to avoid loading
state store providers in different executors across batches.
Review comment:
nit: waitting -> waiting
##########
File path: docs/structured-streaming-programming-guide.md
##########
@@ -1689,6 +1689,25 @@ hence the number is not same as the number of original
input rows. You'd like to
There's a known workaround: split your streaming query into multiple queries
per stateful operator, and ensure
end-to-end exactly once per query. Ensuring end-to-end exactly once for the
last query is optional.
+### State Store and task locality
+
+The stateful operations store states for events in state stores of executors.
State stores occupy resources such as memory and disk space to store the states.
+So it is more efficient to keep a state store provider running in the same
executor across different streaming batches.
+Changing the location of a state store provider requires loading from
checkpointed states from HDFS in the new executor.
+
+The stateful operations in Structured Streaming queries rely on the preferred
location feature of Spark's RDD to run the state store provider on the same
executor.
Review comment:
I'd rearrange following sentences to explain happy case, and the bad
case. Like...
> The stateful operations in Structured Streaming queries rely on the
preferred location feature of Spark's RDD to run the state store provider on
the same executor. If in the next batch the corresponding state store provider
is scheduled on this executor again, it could reuse the previous states and
save the time of loading checkpointed states.
>
> However, generally the preferred location is not a hard requirement and it
is still possible that Spark schedules tasks to the executors other than the
preferred ones. In this case, Spark will load state store providers from
checkpointed states on HDFS to new executors. The state store providers run in
the previous batch will not be unloaded immediately.
##########
File path: docs/structured-streaming-programming-guide.md
##########
@@ -1689,6 +1689,25 @@ hence the number is not same as the number of original
input rows. You'd like to
There's a known workaround: split your streaming query into multiple queries
per stateful operator, and ensure
end-to-end exactly once per query. Ensuring end-to-end exactly once for the
last query is optional.
+### State Store and task locality
+
+The stateful operations store states for events in state stores of executors.
State stores occupy resources such as memory and disk space to store the states.
+So it is more efficient to keep a state store provider running in the same
executor across different streaming batches.
+Changing the location of a state store provider requires loading from
checkpointed states from HDFS in the new executor.
+
+The stateful operations in Structured Streaming queries rely on the preferred
location feature of Spark's RDD to run the state store provider on the same
executor.
+However, generally the preferred location is not a hard requirement and it is
still possible that Spark schedules tasks to the executors other than the
preferred ones.
+
+In this case, Spark will load state store providers from checkpointed states
on HDFS to new executors. The state store providers run in the previous batch
will not be unloaded immediately.
Review comment:
same here. let's avoid mentioning HDFS.
##########
File path: docs/structured-streaming-programming-guide.md
##########
@@ -1689,6 +1689,25 @@ hence the number is not same as the number of original
input rows. You'd like to
There's a known workaround: split your streaming query into multiple queries
per stateful operator, and ensure
end-to-end exactly once per query. Ensuring end-to-end exactly once for the
last query is optional.
+### State Store and task locality
+
+The stateful operations store states for events in state stores of executors.
State stores occupy resources such as memory and disk space to store the states.
+So it is more efficient to keep a state store provider running in the same
executor across different streaming batches.
+Changing the location of a state store provider requires loading from
checkpointed states from HDFS in the new executor.
+
+The stateful operations in Structured Streaming queries rely on the preferred
location feature of Spark's RDD to run the state store provider on the same
executor.
+However, generally the preferred location is not a hard requirement and it is
still possible that Spark schedules tasks to the executors other than the
preferred ones.
+
+In this case, Spark will load state store providers from checkpointed states
on HDFS to new executors. The state store providers run in the previous batch
will not be unloaded immediately.
+If in the next batch the corresponding state store provider is scheduled on
this executor again, it could reuse the previous states and save the time of
loading checkpointed states.
+Spark runs a maintenance task which checks and unloads the state store
providers that are inactive on the executors.
+
+For some use cases such as processing very large state data, loading new state
store providers from checkpointed states can be very time-consuming and
inefficient.
+By changing the Spark configurations related to task scheduling, for example
`spark.locality.wait`, users can configure Spark how long to wait to launch a
data-local task.
+For stateful operations in Structured Streaming, it can be used to let state
store providers running on the same executors across batches. Specially, users
can check the state store metrics
+such as `loadedMapCacheHitCount` and `loadedMapCacheMissCount`. Ideally, it is
best if cache missing count is zero that means Spark won't spend time on
loading checkpointed state.
Review comment:
I don't remember the case, but could you please do quick check that
loadedMapCacheMissCount is also increasing for rerunning streaming query? (I
roughly remember the value would increase for the case which is not abnormal)
If then probably better to not mention `zero` but `minimized`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]