Hi Iain, Did you manage to solve this issue? It looks like we have a similar issue with processing time increasing every micro-batch but only after 30 batches.
Thanks. On Thu, Mar 3, 2016 at 4:45 PM Iain Cundy <iain.cu...@amdocs.com> wrote: > Hi All > > > > I’m aggregating data using mapWithState with a timeout set in 1.6.0. It > broadly works well and by providing access to the key and the time in the > callback allows a much more elegant solution for time based aggregation > than the old updateStateByKey function. > > > > However there seems to be a problem – the size of the state and the time > taken to iterate over it for each micro-batch keeps increasing over time, > long after the number of ‘current’ keys settles down. We start removing > keys after just over an hour, but the size of the state keeps increasing in > runs of over 6 hours. > > > > Essentially we start by just adding keys for our input tuples, reaching a > peak of about 7 million keys. Then we start to output data and remove keys > – the number of keys drops to about 5 million. We continue processing > tuples, which adds keys, while removing the keys we no longer need – the > number of keys fluctuates up and down between 5 million and 8 million. > > > > We know this, and are reasonably confident our removal of keys is correct, > because we obtain the state with JavaMapWithStateDStream.stateSnapshots and > count the keys. > > > > From my reading (I don’t know scala!) of the code in > org.apache.spark.streaming.util.StateMap.scala it seems clear that the > removed keys are only marked as deleted and are really destroyed > subsequently by compaction, based upon the length of the chain of delta > maps. We’d expect the size of the state RDDs and the time taken to iterate > over all the state to stabilize once compaction is run after we remove > keys, but it just doesn’t happen. > > > > Is there some possible reason why compaction never gets run? > > > > I tried to use the (undocumented?) config setting > spark.streaming.sessionByKey.deltaChainThreshold to try to control how > often compaction is run with: > > --conf spark.streaming.sessionByKey.deltaChainThreshold=2 > > > > I can see it in the Spark application UI Environment page, but it doesn’t > seem to make any difference. > > > > I have noticed that the timeout mechanism only gets invoked on every 10th > micro-batch. I’m almost sure it isn’t a coincidence that the checkpoint > interval is also 10 micro-batches. I assume that is an intentional > performance optimization. However because I have a lot of keys, I have a > large micro-batch duration, so it would make sense for me to reduce that > factor of 10. However, since I don’t call checkpoint on the state stream I > can’t see how to change it? > > > > Can I change the checkpoint interval somewhere? [I tried calling > JavaMapWithStateDStream.checkpoint myself, but that evidently isn’t the > same thing!] > > > > My initial assumption was that there is a new deltaMap for each > micro-batch, but having noticed the timeout behavior I wonder if there is > only a new deltaMap for each checkpoint? Or maybe there are other criteria? > > > > Perhaps compaction just hasn’t run before my application falls over? Can > anyone clarify exactly when it should run? > > > > Or maybe compaction doesn’t delete old removed keys for some reason? > > > > Thank you for your attention. > > > > Cheers > > Iain Cundy > > > This message and the information contained herein is proprietary and > confidential and subject to the Amdocs policy statement, you may review at > http://www.amdocs.com/email_disclaimer.asp >