For computing mapGroupsWithState, can you check the following.
- How many tasks are being launched in the reduce stage (that is, the stage
after the shuffle, that is computing mapGroupsWithState)
- How long each task is taking?
- How many cores does the cluster have?


On Thu, Jan 18, 2018 at 11:28 PM, chris-sw <christiaan....@semmelwise.nl>
wrote:

> Hi,
>
> I recently did some experiments with stateful structured streaming by using
> flatmapgroupswithstate. The streaming application is quit simple: It
> receives data from Kafka, feed it to the stateful operator
> (flatmapgroupswithstate) and sinks the output to console.
> During a test with small datasets (3-5 records per batch) I experienced
> long
> batch runs.
>
> Taking a look at the log I see an explosion of tasks with these log
> entries:
> -----
> 2018-01-18 13:33:46,668 [Executor task launch worker for task 287] INFO
> org.apache.spark.executor.Executor - Running task 85.0 in stage 3.0 (TID
> 287)
> 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
> org.apache.spark.sql.execution.streaming.state.
> HDFSBackedStateStoreProvider
> - Retrieved version 1 of HDFSStateStoreProvider[id = (op=0, part=85), dir =
> /tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85] for update
> 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
> org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 0 non-empty
> blocks out of 1 blocks
> 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
> org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 0 remote
> fetches in 0 ms
> 2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO
> org.apache.spark.sql.execution.streaming.state.
> HDFSBackedStateStoreProvider
> - Committed version 2 for
> HDFSStateStore[id=(op=0,part=85),dir=/tmp/temporary-
> 8b418cec-0378-4324-a916-6e3864500d56/state/0/85]
> to file
> /tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85/2.delta
> 2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO
> org.apache.spark.executor.Executor - Finished task 85.0 in stage 3.0 (TID
> 287). 2212 bytes result sent to driver
> -----
>
> A batch run takes approx. 5 seconds and it seems most of the time it is
> doing tasks like above.
> I created several apps using the non-Spark SQL approach with mapWithState
> but never experienced these long batch runs.
>
> Anyone has this experience as well or can help me finding a solution for
> these long runs.
>
> Regards,
>
> Chris
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to