Hi Iain,
Did you manage to solve this issue?
It looks like we have a similar issue with processing time increasing every
micro-batch but only after 30 batches.

Thanks.

On Thu, Mar 3, 2016 at 4:45 PM Iain Cundy <iain.cu...@amdocs.com> wrote:

> Hi All
>
>
>
> I’m aggregating data using mapWithState with a timeout set in 1.6.0. It
> broadly works well and by providing access to the key and the time in the
> callback allows a much more elegant solution for time based aggregation
> than the old updateStateByKey function.
>
>
>
> However there seems to be a problem – the size of the state and the time
> taken to iterate over it for each micro-batch keeps increasing over time,
> long after the number of ‘current’ keys settles down. We start removing
> keys after just over an hour, but the size of the state keeps increasing in
> runs of over 6 hours.
>
>
>
> Essentially we start by just adding keys for our input tuples, reaching a
> peak of about 7 million keys. Then we start to output data and remove keys
> – the number of keys drops to about 5 million. We continue processing
> tuples, which adds keys, while removing the keys we no longer need – the
> number of keys fluctuates up and down between 5 million and  8 million.
>
>
>
> We know this, and are reasonably confident our removal of keys is correct,
> because we obtain the state with JavaMapWithStateDStream.stateSnapshots and
> count the keys.
>
>
>
> From my reading (I don’t know scala!) of the code in
> org.apache.spark.streaming.util.StateMap.scala it seems clear that the
> removed keys are only marked as deleted and are really destroyed
> subsequently by compaction, based upon the length of the chain of delta
> maps. We’d expect the size of the state RDDs and the time taken to iterate
> over all the state to stabilize once compaction is run after we remove
> keys, but it just doesn’t happen.
>
>
>
> Is there some possible reason why compaction never gets run?
>
>
>
> I tried to use the (undocumented?) config setting
> spark.streaming.sessionByKey.deltaChainThreshold to try to control how
> often compaction is run with:
>
> --conf spark.streaming.sessionByKey.deltaChainThreshold=2
>
>
>
> I can see it in the Spark application UI Environment page, but it doesn’t
> seem to make any difference.
>
>
>
> I have noticed that the timeout mechanism only gets invoked on every 10th
> micro-batch. I’m almost sure it isn’t a coincidence that the checkpoint
> interval is also 10 micro-batches. I assume that is an intentional
> performance optimization. However because I have a lot of keys, I have a
> large micro-batch duration, so it would make sense for me to reduce that
> factor of 10. However, since I don’t call checkpoint on the state stream I
> can’t see how to change it?
>
>
>
> Can I change the checkpoint interval  somewhere? [I tried calling
> JavaMapWithStateDStream.checkpoint myself, but that evidently isn’t the
> same thing!]
>
>
>
> My initial assumption was that there is a new deltaMap for each
> micro-batch, but having noticed the timeout behavior I wonder if there is
> only a new deltaMap for each checkpoint? Or maybe there are other criteria?
>
>
>
> Perhaps compaction just hasn’t run before my application falls over? Can
> anyone clarify exactly when it should run?
>
>
>
> Or maybe compaction doesn’t delete old removed keys for some reason?
>
>
>
> Thank you for your attention.
>
>
>
> Cheers
>
> Iain Cundy
>
>
> This message and the information contained herein is proprietary and
> confidential and subject to the Amdocs policy statement, you may review at
> http://www.amdocs.com/email_disclaimer.asp
>

Reply via email to