percentile_approx returns the approximate percentile(s)
<https://github.com/apache/spark/pull/14868> The memory consumption is
bounded. The larger accuracy parameter we choose, the smaller error we get.
The default accuracy value is 10000, to match with Hive default setting.
Choose a smaller value for a smaller memory footprint.

When I run my code on a single PC where N = 10 millions it takes 22.52
seconds. Notebook added.

I don't think that the question asker will have only returned the top
20 percentages.


lør. 16. sep. 2023 kl. 17:49 skrev Mich Talebzadeh <
mich.talebza...@gmail.com>:

> Hi Bjorn,
>
> I thought that one is better off using percentile_approx as it seems to be
> the recommended approach for computing percentiles and can simplify the
> code.
> I have modified your code to use percentile_approx rather than manually
> computing it. It would be interesting to hear ideas on this.
>
> Here is the code:
>
> # Standard library imports
> import json
> import multiprocessing
> import os
> import re
> import sys
> import random
>
> # Third-party imports
> import numpy as np
> import pandas as pd
> import pyarrow
>
> # Pyspark imports
> from pyspark import SparkConf, SparkContext
> from pyspark.sql import SparkSession, functions as F, Window
> from pyspark.sql.functions import (
>     col, concat, concat_ws, expr, lit, trim, udf
> )
> from pyspark.sql.types import (
>     IntegerType, StringType, StructField, StructType,
>     DoubleType, TimestampType
> )
> from pyspark import pandas as ps
>
> os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
>
> number_cores = int(multiprocessing.cpu_count())
>
> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  #
> e.g. 4015976448
> memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74
>
>
> def get_spark_session(app_name: str, conf: SparkConf):
>     conf.setMaster("local[{}]".format(number_cores))
>     conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
>         "spark.sql.repl.eagerEval.enabled", "True"
>     ).set("spark.sql.adaptive.enabled", "True").set(
>         "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
>     ).set(
>         "spark.sql.repl.eagerEval.maxNumRows", "10000"
>     )
>
>     return
> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>
>
> spark = get_spark_session("My super app", SparkConf())
> sc = SparkContext.getOrCreate()
> sc.setLogLevel("ERROR")
>
> def generate_ip():
>     return ".".join(str(random.randint(0, 255)) for _ in range(4))
>
> def generate_timestamp():
>     return pd.Timestamp(
>         year=random.randint(2021, 2023),
>         month=random.randint(1, 12),
>         day=random.randint(1, 28),
>         hour=random.randint(0, 23),
>         minute=random.randint(0, 59),
>         second=random.randint(0, 59)
>     )
>
> def random_gbps():
>     return random.uniform(0, 10)
>
> # Number of rows
> n = 20
>
> data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(),
> "date_time": generate_timestamp()} for _ in range(n)]
> df = spark.createDataFrame(pd.DataFrame(data))
> df.show()
>
> agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))
>
> windowRank = Window.orderBy(F.col("total_gbps").desc())
> agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))
>
> # Calculate the 80th percentile value
> percentile_80 = agg_df.agg(F.expr("percentile_approx(total_gbps,
> 0.8)").alias("percentile_80")).collect()[0]["percentile_80"]
>
> # Filter the DataFrame based on the condition
> filtered_df = df.filter(df["gbps"] >= percentile_80)
>
> # Show the filtered DataFrame
> print(f"Filtered DataFrame")
> filtered_df.show()
>
> print(f"Total rows in data frame = {df.count()}")
> print(f"Result satisfying 80% percentile = {filtered_df.count()}")
>
> And this is the results
>
> +---------------+------------------+-------------------+
> |   incoming_ips|              gbps|          date_time|
> +---------------+------------------+-------------------+
> |129.189.130.141|2.6517421918102335|2021-09-06 08:29:25|
> | 215.177.39.239|1.8210013026429361|2023-10-10 17:00:13|
> |  78.202.71.184| 8.060958370556456|2022-02-22 04:25:03|
> |219.100.198.137|0.3449002002472945|2023-09-28 01:39:44|
> |234.234.156.107|2.6187481766507013|2022-11-16 11:33:41|
> |  6.223.135.194|0.3510752223686242|2022-01-24 04:13:53|
> | 147.118.171.59|6.4071750880652765|2023-10-08 16:49:10|
> |  75.41.101.165|2.1484984272041685|2022-07-13 21:02:58|
> |  163.26.238.22|   9.8999646499433|2023-01-12 17:54:44|
> | 184.145.98.231|1.8875849709728088|2022-09-18 19:53:58|
> | 125.77.236.177|  1.17126350326476|2021-08-19 18:48:42|
> |  34.103.211.39|  9.51081430594299|2023-02-05 18:39:23|
> |   117.37.42.91| 1.122437784309721|2021-03-23 17:27:27|
> | 108.115.42.171| 8.165187506266607|2023-07-26 03:57:50|
> | 98.105.153.129| 9.284242190156004|2023-10-10 22:36:47|
> | 145.35.252.142| 9.787384042283957|2022-08-26 00:53:27|
> |  18.76.138.108| 6.939770760444909|2022-04-01 01:18:27|
> |    31.33.71.26| 4.820947188427366|2021-06-10 22:02:51|
> |    135.22.8.38| 9.587849542001745|2021-09-21 15:11:59|
> |104.231.110.207| 9.045897927807715|2023-06-28 06:01:00|
> +---------------+------------------+-------------------+
>
> Filtered DataFrame
> +--------------+-----------------+-------------------+
> |  incoming_ips|             gbps|          date_time|
> +--------------+-----------------+-------------------+
> | 163.26.238.22|  9.8999646499433|2023-01-12 17:54:44|
> | 34.103.211.39| 9.51081430594299|2023-02-05 18:39:23|
> |98.105.153.129|9.284242190156004|2023-10-10 22:36:47|
> |145.35.252.142|9.787384042283957|2022-08-26 00:53:27|
> |   135.22.8.38|9.587849542001745|2021-09-21 15:11:59|
> +--------------+-----------------+-------------------+
>
> Total rows in data frame = 20
> Result satisfying 80% percentile = 5
>
> Cheers
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 16 Sept 2023 at 11:46, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Happy Saturday coding 😁
>>
>>
>> Mich Talebzadeh,
>> Distinguished Technologist, Solutions Architect & Engineer
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 16 Sept 2023 at 11:30, Bjørn Jørgensen <bjornjorgen...@gmail.com>
>> wrote:
>>
>>> ah.. yes that's right.
>>> I did have to use some time on this one and I was having some issues
>>> with the code.
>>> I restart the notebook kernel now and rerun it and I get the same
>>> result.
>>>
>>> lør. 16. sep. 2023 kl. 11:41 skrev Mich Talebzadeh <
>>> mich.talebza...@gmail.com>:
>>>
>>>> Splendid code. A minor error glancing at your code.
>>>>
>>>> print(df.count())
>>>> print(result_df.count())
>>>>
>>>>
>>>> You have not defined result_df. I gather you meant "result"?
>>>>
>>>>
>>>> print(result.count())
>>>>
>>>>
>>>> That should fix it 🤔
>>>>
>>>> HTH
>>>>
>>>>
>>>> Mich Talebzadeh,
>>>> Distinguished Technologist, Solutions Architect & Engineer
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, 16 Sept 2023 at 06:00, Bjørn Jørgensen <
>>>> bjornjorgen...@gmail.com> wrote:
>>>>
>>>>> Something like this?
>>>>>
>>>>>
>>>>> # Standard library imports
>>>>> import json
>>>>> import multiprocessing
>>>>> import os
>>>>> import re
>>>>> import sys
>>>>> import random
>>>>>
>>>>> # Third-party imports
>>>>> import numpy as np
>>>>> import pandas as pd
>>>>> import pyarrow
>>>>>
>>>>> # Pyspark imports
>>>>> from pyspark import SparkConf, SparkContext
>>>>> from pyspark.sql import SparkSession, functions as F, Window
>>>>> from pyspark.sql.functions import (
>>>>>     col, concat, concat_ws, expr, lit, trim, udf
>>>>> )
>>>>> from pyspark.sql.types import (
>>>>>     IntegerType, StringType, StructField, StructType,
>>>>>     DoubleType, TimestampType
>>>>> )
>>>>> from pyspark import pandas as ps
>>>>>
>>>>> os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
>>>>>
>>>>> number_cores = int(multiprocessing.cpu_count())
>>>>>
>>>>> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  # 
>>>>> e.g. 4015976448
>>>>> memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74
>>>>>
>>>>>
>>>>> def get_spark_session(app_name: str, conf: SparkConf):
>>>>>     conf.setMaster("local[{}]".format(number_cores))
>>>>>     conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
>>>>>         "spark.sql.repl.eagerEval.enabled", "True"
>>>>>     ).set("spark.sql.adaptive.enabled", "True").set(
>>>>>         "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
>>>>>     ).set(
>>>>>         "spark.sql.repl.eagerEval.maxNumRows", "10000"
>>>>>     ).set(
>>>>>         "sc.setLogLevel", "ERROR"
>>>>>     )
>>>>>
>>>>>     return 
>>>>> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>>>>>
>>>>>
>>>>> spark = get_spark_session("My super app", SparkConf())
>>>>> spark.sparkContext.setLogLevel("ERROR")
>>>>>
>>>>>
>>>>>
>>>>> def generate_ip():
>>>>>     return ".".join(str(random.randint(0, 255)) for _ in range(4))
>>>>>
>>>>> def generate_timestamp():
>>>>>     return pd.Timestamp(
>>>>>         year=random.randint(2021, 2023),
>>>>>         month=random.randint(1, 12),
>>>>>         day=random.randint(1, 28),
>>>>>         hour=random.randint(0, 23),
>>>>>         minute=random.randint(0, 59),
>>>>>         second=random.randint(0, 59)
>>>>>     )
>>>>>
>>>>> def random_gbps():
>>>>>     return random.uniform(0, 10)
>>>>>
>>>>> # Number of rows
>>>>> n = 20
>>>>>
>>>>> data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), 
>>>>> "date_time": generate_timestamp()} for _ in range(n)]
>>>>> df = spark.createDataFrame(pd.DataFrame(data))
>>>>> df.show()
>>>>>
>>>>> agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))
>>>>>
>>>>> windowRank = Window.orderBy(F.col("total_gbps").desc())
>>>>> agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))
>>>>>
>>>>> top_80_ips = agg_df.filter(F.col("rank") <= 0.80)
>>>>> result = df.join(top_80_ips, on="incoming_ips", 
>>>>> how="inner").select("incoming_ips", "gbps", "date_time")
>>>>> result.show()
>>>>>
>>>>> print(df.count())
>>>>> print(result_df.count())
>>>>>
>>>>>
>>>>> +---------------+-------------------+-------------------+
>>>>> |   incoming_ips|               gbps|          date_time|
>>>>> +---------------+-------------------+-------------------+
>>>>> |   66.186.8.130|  5.074283124722104|2022-03-12 05:09:16|
>>>>> |  155.45.76.235| 0.6736194760917324|2021-06-19 03:36:28|
>>>>> | 237.51.137.200|0.43334812775057685|2022-04-27 08:08:47|
>>>>> |    78.4.48.171| 7.5675453578753435|2022-08-21 18:55:48|
>>>>> |  241.84.163.17| 3.5681655964070815|2021-01-24 20:39:50|
>>>>> |130.255.202.138|  6.066112278135983|2023-07-07 22:26:15|
>>>>> | 198.33.206.140| 1.9147905257021836|2023-03-01 04:44:14|
>>>>> |  84.183.253.20|  7.707176860385722|2021-08-26 23:24:31|
>>>>> |218.163.165.232|  9.458673015973213|2021-02-22 12:13:15|
>>>>> |   62.57.20.153| 1.5764916247359229|2021-11-06 12:41:59|
>>>>> | 245.24.168.152|0.07452805411698016|2021-06-04 16:14:36|
>>>>> | 98.171.202.249|  3.546118349483626|2022-07-05 10:55:26|
>>>>> |   210.5.246.85|0.02430730260109759|2022-04-08 17:26:04|
>>>>> | 13.236.170.177|   2.41361938344535|2021-08-11 02:19:06|
>>>>> |180.140.248.193| 0.9512956363005021|2021-06-27 18:16:58|
>>>>> |  26.140.88.127|   7.51335778127692|2023-06-02 14:13:30|
>>>>> |  7.118.207.252|  6.450499049816286|2022-12-11 06:36:20|
>>>>> |    11.8.10.136|  8.750329246667354|2023-02-03 05:33:16|
>>>>> |  232.140.56.86|  4.289740988237201|2023-02-22 20:10:09|
>>>>> |   68.117.9.255|  5.384340363304169|2022-12-03 09:55:26|
>>>>> +---------------+-------------------+-------------------+
>>>>>
>>>>> +---------------+------------------+-------------------+
>>>>> |   incoming_ips|              gbps|          date_time|
>>>>> +---------------+------------------+-------------------+
>>>>> |   66.186.8.130| 5.074283124722104|2022-03-12 05:09:16|
>>>>> |  241.84.163.17|3.5681655964070815|2021-01-24 20:39:50|
>>>>> |    78.4.48.171|7.5675453578753435|2022-08-21 18:55:48|
>>>>> |130.255.202.138| 6.066112278135983|2023-07-07 22:26:15|
>>>>> | 198.33.206.140|1.9147905257021836|2023-03-01 04:44:14|
>>>>> |  84.183.253.20| 7.707176860385722|2021-08-26 23:24:31|
>>>>> |218.163.165.232| 9.458673015973213|2021-02-22 12:13:15|
>>>>> |   62.57.20.153|1.5764916247359229|2021-11-06 12:41:59|
>>>>> | 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26|
>>>>> |180.140.248.193|0.9512956363005021|2021-06-27 18:16:58|
>>>>> | 13.236.170.177|  2.41361938344535|2021-08-11 02:19:06|
>>>>> |  26.140.88.127|  7.51335778127692|2023-06-02 14:13:30|
>>>>> |  7.118.207.252| 6.450499049816286|2022-12-11 06:36:20|
>>>>> |    11.8.10.136| 8.750329246667354|2023-02-03 05:33:16|
>>>>> |  232.140.56.86| 4.289740988237201|2023-02-22 20:10:09|
>>>>> |   68.117.9.255| 5.384340363304169|2022-12-03 09:55:26|
>>>>> +---------------+------------------+-------------------+
>>>>>
>>>>> 20
>>>>> 16
>>>>>
>>>>>
>>>>>
>>>>> fre. 15. sep. 2023 kl. 20:14 skrev ashok34...@yahoo.com.INVALID
>>>>> <ashok34...@yahoo.com.invalid>:
>>>>>
>>>>>> Hi team,
>>>>>>
>>>>>> I am using PySpark 3.4
>>>>>>
>>>>>> I have a table of million rows that has few columns. among them
>>>>>> incoming ips  and what is known as gbps (Gigabytes per second) and
>>>>>> date and time of  incoming ip.
>>>>>>
>>>>>> I want to filter out 20% of low active ips and work on the remainder
>>>>>> of data. How can I do thiis in PySpark?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Bjørn Jørgensen
>>>>> Vestre Aspehaug 4, 6010 Ålesund
>>>>> Norge
>>>>>
>>>>> +47 480 94 297
>>>>>
>>>>
>>>
>>> --
>>> Bjørn Jørgensen
>>> Vestre Aspehaug 4, 6010 Ålesund
>>> Norge
>>>
>>> +47 480 94 297
>>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Attachment: Untitled7.ipynb
Description: Binary data

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to