Its actually necessary to retire keys that become "Zero" or "Empty" so to
speak. In your case, the key is "imageURL" and values are a dictionary, one
of whose fields is "count" that you are maintaining. For simplicity and
illustration's sake I will assume imageURL to be a strings like "abc". Your
slide duration is 60 and window duration is 1800 seconds.

Now consider the following chain of events in your stream.

batch 1 : "abc"
batch 2 : "xyz"
batch 3 : "abc"

and now for the rest of the stream, the keys "abc" or "xyz" never occur.

At the end of the third batch, the generated window rdd has
{ "abc" -> count = 2, "xyz" -> count = 1 }.
When the first batch falls off after 1800 seconds, it will become
{ "abc -> count = 1, "xyz" -> count = 1 }.
60 seconds later, it will become
{ "abc" -> count = 1, "xyz" -> count = 0 }
and a further 60 seconds later, the 3rd batch is removed from the window
and the new window rdd becomes
{ "abc" -> count = 0, "xyz" -> count = 0 }.

I hope you can see what is wrong with this. These keys will be perpetually
held in memory even though there is no need for them to be there. This
growth in the size of the generated window rdd is what's giving rise to the
deteriorating processing time in your case.

A filter function that's equivalent of "count != 0" will suffice to
remember only those keys that have not become "Zero".

HTH,
NB



On Thu, Jun 16, 2016 at 8:12 PM, Roshan Singh <singh.rosha...@gmail.com>
wrote:

> Hi,
> According to the docs (
> https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.DStream.reduceByKeyAndWindow),
> filerFunc can be used to retain expiring keys. I do not want to retain any
> expiring key, so I do not understand how can this help me stabilize it.
> Please correct me if this is not the case.
>
> I am also specifying both reduceFunc and invReduceFunc. Can you can a
> sample code of what you are using.
>
> Thanks.
>
> On Fri, Jun 17, 2016 at 3:43 AM, N B <nb.nos...@gmail.com> wrote:
>
>> We had this same issue with the reduceByKeyAndWindow API that you are
>> using. For fixing this issue, you have to use  different flavor of that
>> API, specifically the 2 versions that allow you to give a 'Filter function'
>> to them. Putting in the filter functions helped stabilize our application
>> too.
>>
>> HTH
>> NB
>>
>>
>> On Sun, Jun 12, 2016 at 11:19 PM, Roshan Singh <singh.rosha...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>> I have a python streaming job which is supposed to run 24x7. I am unable
>>> to stabilize it. The job just counts no of links shared in a 30 minute
>>> sliding window. I am using reduceByKeyAndWindow operation with a batch of
>>> 30 seconds, slide interval of 60 seconds.
>>>
>>> The kafka queue has a rate of nearly 2200 messages/second which can
>>> increase to 3000 but the mean is 2200.
>>>
>>> I have played around with batch size, slide interval, and by increasing
>>> parallelism with no fruitful result. These just delay the destabilization.
>>>
>>> GC time is usually between 60-100 ms.
>>>
>>> I also noticed that the jobs were not distributed to other nodes in the
>>> spark UI, for which I have used configured spark.locality.wait as 100ms.
>>> After which I have noticed that the job is getting distributed properly.
>>>
>>> I have a cluster of 6 slaves and one master each with 16 cores and 15gb
>>> of ram.
>>>
>>> Code and configuration: http://pastebin.com/93DMSiji
>>>
>>> Streaming screenshot: http://imgur.com/psNfjwJ
>>>
>>> I need help in debugging the issue. Any help will be appreciated.
>>>
>>> --
>>> Roshan Singh
>>>
>>>
>>
>
>
> --
> Roshan Singh
> http://roshansingh.in
>

Reply via email to