Re: When should we cache / persist ? After or Before Actions?

2022-04-27 Thread Sean Owen
You certainly shouldn't just sprinkle them in, no, that has never been the
idea here. It can help in some cases, but is just overhead in others.
Be thoughtful about why you are adding these statements.

On Wed, Apr 27, 2022 at 11:16 AM Koert Kuipers  wrote:

> we have quite a few persists statements in our codebase whenever we are
> reusing a dataframe.
> we noticed that it slows things down quite a bit (sometimes doubles the
> runtime), while providing little benefits, since spark already re-uses the
> shuffle files underlying the dataframe efficiently even if you don't do the
> persist.
> so at this point i am considering removing those persist statements...
> not sure what other peoples experiences are on this
>
> ‪On Thu, Apr 21, 2022 at 9:41 AM ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <
> yur...@gmail.com> wrote:‬
>
>> Hi Sean
>>
>> Persisting/caching is useful when you’re going to reuse dataframe. So in
>> your case no persisting/caching is required. This is regarding to “when”.
>>
>> The “where” usually belongs to the closest point of reusing
>> calculations/transformations
>>
>> Btw, I’m not sure if caching is useful when you have a HUGE dataframe.
>> Maybe persisting will be more useful
>>
>> Best regards
>>
>> On 21 Apr 2022, at 16:24, Sean Owen  wrote:
>>
>> 
>> You persist before actions, not after, if you want the action's outputs
>> to be persistent.
>> If anything swap line 2 and 3. However, there's no point in the count()
>> here, and because there is already only one action following to write, no
>> caching is useful in that example.
>>
>> On Thu, Apr 21, 2022 at 2:26 AM Sid  wrote:
>>
>>> Hi Folks,
>>>
>>> I am working on Spark Dataframe API where I am doing following thing:
>>>
>>> 1) df = spark.sql("some sql on huge dataset").persist()
>>> 2) df1 = df.count()
>>> 3) df.repartition().write.mode().parquet("")
>>>
>>>
>>> AFAIK, persist should be used after count statement if at all it is
>>> needed to be used since spark is lazily evaluated and if I call any action
>>> it will recompute the above code and hence no use of persisting it before
>>> action.
>>>
>>> Therefore, it should be something like the below that should give better
>>> performance.
>>> 1) df= spark.sql("some sql on huge dataset")
>>> 2) df1 = df.count()
>>> 3) df.persist()
>>> 4) df.repartition().write.mode().parquet("")
>>>
>>> So please help me to understand how it should be exactly and why? If I
>>> am not correct
>>>
>>> Thanks,
>>> Sid
>>>
>>>
> CONFIDENTIALITY NOTICE: This electronic communication and any files
> transmitted with it are confidential, privileged and intended solely for
> the use of the individual or entity to whom they are addressed. If you are
> not the intended recipient, you are hereby notified that any disclosure,
> copying, distribution (electronic or otherwise) or forwarding of, or the
> taking of any action in reliance on the contents of this transmission is
> strictly prohibited. Please notify the sender immediately by e-mail if you
> have received this email by mistake and delete this email from your system.
>
> Is it necessary to print this email? If you care about the environment
> like we do, please refrain from printing emails. It helps to keep the
> environment forested and litter-free.


Re: When should we cache / persist ? After or Before Actions?

2022-04-27 Thread Koert Kuipers
we have quite a few persists statements in our codebase whenever we are
reusing a dataframe.
we noticed that it slows things down quite a bit (sometimes doubles the
runtime), while providing little benefits, since spark already re-uses the
shuffle files underlying the dataframe efficiently even if you don't do the
persist.
so at this point i am considering removing those persist statements...
not sure what other peoples experiences are on this

‪On Thu, Apr 21, 2022 at 9:41 AM ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <
yur...@gmail.com> wrote:‬

> Hi Sean
>
> Persisting/caching is useful when you’re going to reuse dataframe. So in
> your case no persisting/caching is required. This is regarding to “when”.
>
> The “where” usually belongs to the closest point of reusing
> calculations/transformations
>
> Btw, I’m not sure if caching is useful when you have a HUGE dataframe.
> Maybe persisting will be more useful
>
> Best regards
>
> On 21 Apr 2022, at 16:24, Sean Owen  wrote:
>
> 
> You persist before actions, not after, if you want the action's outputs to
> be persistent.
> If anything swap line 2 and 3. However, there's no point in the count()
> here, and because there is already only one action following to write, no
> caching is useful in that example.
>
> On Thu, Apr 21, 2022 at 2:26 AM Sid  wrote:
>
>> Hi Folks,
>>
>> I am working on Spark Dataframe API where I am doing following thing:
>>
>> 1) df = spark.sql("some sql on huge dataset").persist()
>> 2) df1 = df.count()
>> 3) df.repartition().write.mode().parquet("")
>>
>>
>> AFAIK, persist should be used after count statement if at all it is
>> needed to be used since spark is lazily evaluated and if I call any action
>> it will recompute the above code and hence no use of persisting it before
>> action.
>>
>> Therefore, it should be something like the below that should give better
>> performance.
>> 1) df= spark.sql("some sql on huge dataset")
>> 2) df1 = df.count()
>> 3) df.persist()
>> 4) df.repartition().write.mode().parquet("")
>>
>> So please help me to understand how it should be exactly and why? If I am
>> not correct
>>
>> Thanks,
>> Sid
>>
>>

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.


Re: Dealing with large number of small files

2022-04-27 Thread Sid
Yes,


It created a list of records separated by , and it was created faster as
well.

On Wed, 27 Apr 2022, 13:42 Gourav Sengupta, 
wrote:

> Hi,
> did that result in valid JSON in the output file?
>
> Regards,
> Gourav Sengupta
>
> On Tue, Apr 26, 2022 at 8:18 PM Sid  wrote:
>
>> I have .txt files with JSON inside it. It is generated by some API calls
>> by the Client.
>>
>> On Wed, Apr 27, 2022 at 12:39 AM Bjørn Jørgensen <
>> bjornjorgen...@gmail.com> wrote:
>>
>>> What is that you have? Is it txt files or json files?
>>> Or do you have txt files with JSON inside?
>>>
>>>
>>>
>>> tir. 26. apr. 2022 kl. 20:41 skrev Sid :
>>>
 Thanks for your time, everyone :)

 Much appreciated.

 I solved it using jq utility since I was dealing with JSON. I have
 solved it using below script:

 find . -name '*.txt' -exec cat '{}' + | jq -s '.' > output.txt


 Thanks,

 Sid


 On Tue, Apr 26, 2022 at 9:37 PM Bjørn Jørgensen <
 bjornjorgen...@gmail.com> wrote:

> and the bash script seems to read txt files not json
>
> for f in Agent/*.txt; do cat ${f} >> merged.json;done;
>
>
>
> tir. 26. apr. 2022 kl. 18:03 skrev Gourav Sengupta <
> gourav.sengu...@gmail.com>:
>
>> Hi,
>>
>> what is the version of spark are you using? And where is the data
>> stored.
>>
>> I am not quite sure that just using a bash script will help because
>> concatenating all the files into a single file creates a valid JSON.
>>
>> Regards,
>> Gourav
>>
>> On Tue, Apr 26, 2022 at 3:44 PM Sid  wrote:
>>
>>> Hello,
>>>
>>> Can somebody help me with the below problem?
>>>
>>>
>>> https://stackoverflow.com/questions/72015557/dealing-with-large-number-of-small-json-files-using-pyspark
>>>
>>>
>>> Thanks,
>>> Sid
>>>
>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>

>>>
>>> --
>>> Bjørn Jørgensen
>>> Vestre Aspehaug 4, 6010 Ålesund
>>> Norge
>>>
>>> +47 480 94 297
>>>
>>


[window aggregate][debug] Rows not dropping with watermark and window

2022-04-27 Thread Xavier Gervilla
Hi team,



With your help last week I was able to adapt a project I'm developing and apply 
a sentiment analysis and NER retrieval to streaming tweets. One of the next 
steps in order to ensure that memory doesn't collapse is applying windows and 
watermarks to discard tweets after some time. However, when checking the metric 
''Aggregated Number Of Rows Dropped By Watermark'' from SparkUI it's always 0.



This is the updated code I use to apply the Sentiment and NER prediction and to 
add the timestamp value:



sentPipeline = PretrainedPipeline('analyze_sentiment')

nerPipeline = PretrainedPipeline('recognize_entities_dl')



sentPred = sentPipeline.transform(tweets)

nerPred = nerPipeline.transform(sentPred)

tsCol = nerPred.withColumn('timestamp', current_timestamp())



After applying some transformations I generate two columns with the entity 
(entLab) and its sentiment (sentNum) and apply the watermark before doing the 
query:



finalDF = resultDF.withWatermark("timestamp", "10 minutes").\

  groupBy("entLab", window("timestamp", "5 minutes", "2 minutes")).\

  agg(avg("sentNum").alias("avgSent"), count("sentNum").alias("countEnt")).\

  select("entLab", "avgSent", "countEnt")



query = finalDF.writeStream.queryName('treemapResult').\

  foreachBatch(processBatch).outputMode("update").\

  option("checkpointLocation", "/tmp/checkpoints").start()



Each processBatch generates a plot with the selected values.



When I execute the program it's mostly maintained at 7GB of RAM but increases 
really slowly and as mentioned above when checking SparkUI the number of rows 
dropped is zero. I've tried changing the output to append (since using complete 
would be the opposite of the goal) but the result is very similar.



Is there any problem with the declaration of the watermark? And how could I 
change it to generate a plot after every window is finished? Right now it 
generates around 80-90 seconds instead of the two minutes there are between 
sliding windows.



Thank you in advance!

Re: Dealing with large number of small files

2022-04-27 Thread Gourav Sengupta
Hi,
did that result in valid JSON in the output file?

Regards,
Gourav Sengupta

On Tue, Apr 26, 2022 at 8:18 PM Sid  wrote:

> I have .txt files with JSON inside it. It is generated by some API calls
> by the Client.
>
> On Wed, Apr 27, 2022 at 12:39 AM Bjørn Jørgensen 
> wrote:
>
>> What is that you have? Is it txt files or json files?
>> Or do you have txt files with JSON inside?
>>
>>
>>
>> tir. 26. apr. 2022 kl. 20:41 skrev Sid :
>>
>>> Thanks for your time, everyone :)
>>>
>>> Much appreciated.
>>>
>>> I solved it using jq utility since I was dealing with JSON. I have
>>> solved it using below script:
>>>
>>> find . -name '*.txt' -exec cat '{}' + | jq -s '.' > output.txt
>>>
>>>
>>> Thanks,
>>>
>>> Sid
>>>
>>>
>>> On Tue, Apr 26, 2022 at 9:37 PM Bjørn Jørgensen <
>>> bjornjorgen...@gmail.com> wrote:
>>>
 and the bash script seems to read txt files not json

 for f in Agent/*.txt; do cat ${f} >> merged.json;done;



 tir. 26. apr. 2022 kl. 18:03 skrev Gourav Sengupta <
 gourav.sengu...@gmail.com>:

> Hi,
>
> what is the version of spark are you using? And where is the data
> stored.
>
> I am not quite sure that just using a bash script will help because
> concatenating all the files into a single file creates a valid JSON.
>
> Regards,
> Gourav
>
> On Tue, Apr 26, 2022 at 3:44 PM Sid  wrote:
>
>> Hello,
>>
>> Can somebody help me with the below problem?
>>
>>
>> https://stackoverflow.com/questions/72015557/dealing-with-large-number-of-small-json-files-using-pyspark
>>
>>
>> Thanks,
>> Sid
>>
>

 --
 Bjørn Jørgensen
 Vestre Aspehaug 4, 6010 Ålesund
 Norge

 +47 480 94 297

>>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>