Hi Bjorn,

I thought that one is better off using percentile_approx as it seems to be
the recommended approach for computing percentiles and can simplify the
code.
I have modified your code to use percentile_approx rather than manually
computing it. It would be interesting to hear ideas on this.

Here is the code:

# Standard library imports
import json
import multiprocessing
import os
import re
import sys
import random

# Third-party imports
import numpy as np
import pandas as pd
import pyarrow

# Pyspark imports
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.functions import (
    col, concat, concat_ws, expr, lit, trim, udf
)
from pyspark.sql.types import (
    IntegerType, StringType, StructField, StructType,
    DoubleType, TimestampType
)
from pyspark import pandas as ps

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

number_cores = int(multiprocessing.cpu_count())

mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  #
e.g. 4015976448
memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74


def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("local[{}]".format(number_cores))
    conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
        "spark.sql.repl.eagerEval.enabled", "True"
    ).set("spark.sql.adaptive.enabled", "True").set(
        "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
    ).set(
        "spark.sql.repl.eagerEval.maxNumRows", "10000"
    )

    return
SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()


spark = get_spark_session("My super app", SparkConf())
sc = SparkContext.getOrCreate()
sc.setLogLevel("ERROR")

def generate_ip():
    return ".".join(str(random.randint(0, 255)) for _ in range(4))

def generate_timestamp():
    return pd.Timestamp(
        year=random.randint(2021, 2023),
        month=random.randint(1, 12),
        day=random.randint(1, 28),
        hour=random.randint(0, 23),
        minute=random.randint(0, 59),
        second=random.randint(0, 59)
    )

def random_gbps():
    return random.uniform(0, 10)

# Number of rows
n = 20

data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), "date_time":
generate_timestamp()} for _ in range(n)]
df = spark.createDataFrame(pd.DataFrame(data))
df.show()

agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))

windowRank = Window.orderBy(F.col("total_gbps").desc())
agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))

# Calculate the 80th percentile value
percentile_80 = agg_df.agg(F.expr("percentile_approx(total_gbps,
0.8)").alias("percentile_80")).collect()[0]["percentile_80"]

# Filter the DataFrame based on the condition
filtered_df = df.filter(df["gbps"] >= percentile_80)

# Show the filtered DataFrame
print(f"Filtered DataFrame")
filtered_df.show()

print(f"Total rows in data frame = {df.count()}")
print(f"Result satisfying 80% percentile = {filtered_df.count()}")

And this is the results

+---------------+------------------+-------------------+
|   incoming_ips|              gbps|          date_time|
+---------------+------------------+-------------------+
|129.189.130.141|2.6517421918102335|2021-09-06 08:29:25|
| 215.177.39.239|1.8210013026429361|2023-10-10 17:00:13|
|  78.202.71.184| 8.060958370556456|2022-02-22 04:25:03|
|219.100.198.137|0.3449002002472945|2023-09-28 01:39:44|
|234.234.156.107|2.6187481766507013|2022-11-16 11:33:41|
|  6.223.135.194|0.3510752223686242|2022-01-24 04:13:53|
| 147.118.171.59|6.4071750880652765|2023-10-08 16:49:10|
|  75.41.101.165|2.1484984272041685|2022-07-13 21:02:58|
|  163.26.238.22|   9.8999646499433|2023-01-12 17:54:44|
| 184.145.98.231|1.8875849709728088|2022-09-18 19:53:58|
| 125.77.236.177|  1.17126350326476|2021-08-19 18:48:42|
|  34.103.211.39|  9.51081430594299|2023-02-05 18:39:23|
|   117.37.42.91| 1.122437784309721|2021-03-23 17:27:27|
| 108.115.42.171| 8.165187506266607|2023-07-26 03:57:50|
| 98.105.153.129| 9.284242190156004|2023-10-10 22:36:47|
| 145.35.252.142| 9.787384042283957|2022-08-26 00:53:27|
|  18.76.138.108| 6.939770760444909|2022-04-01 01:18:27|
|    31.33.71.26| 4.820947188427366|2021-06-10 22:02:51|
|    135.22.8.38| 9.587849542001745|2021-09-21 15:11:59|
|104.231.110.207| 9.045897927807715|2023-06-28 06:01:00|
+---------------+------------------+-------------------+

Filtered DataFrame
+--------------+-----------------+-------------------+
|  incoming_ips|             gbps|          date_time|
+--------------+-----------------+-------------------+
| 163.26.238.22|  9.8999646499433|2023-01-12 17:54:44|
| 34.103.211.39| 9.51081430594299|2023-02-05 18:39:23|
|98.105.153.129|9.284242190156004|2023-10-10 22:36:47|
|145.35.252.142|9.787384042283957|2022-08-26 00:53:27|
|   135.22.8.38|9.587849542001745|2021-09-21 15:11:59|
+--------------+-----------------+-------------------+

Total rows in data frame = 20
Result satisfying 80% percentile = 5

Cheers
Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 16 Sept 2023 at 11:46, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Happy Saturday coding 😁
>
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 16 Sept 2023 at 11:30, Bjørn Jørgensen <bjornjorgen...@gmail.com>
> wrote:
>
>> ah.. yes that's right.
>> I did have to use some time on this one and I was having some issues with
>> the code.
>> I restart the notebook kernel now and rerun it and I get the same
>> result.
>>
>> lør. 16. sep. 2023 kl. 11:41 skrev Mich Talebzadeh <
>> mich.talebza...@gmail.com>:
>>
>>> Splendid code. A minor error glancing at your code.
>>>
>>> print(df.count())
>>> print(result_df.count())
>>>
>>>
>>> You have not defined result_df. I gather you meant "result"?
>>>
>>>
>>> print(result.count())
>>>
>>>
>>> That should fix it 🤔
>>>
>>> HTH
>>>
>>>
>>> Mich Talebzadeh,
>>> Distinguished Technologist, Solutions Architect & Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sat, 16 Sept 2023 at 06:00, Bjørn Jørgensen <bjornjorgen...@gmail.com>
>>> wrote:
>>>
>>>> Something like this?
>>>>
>>>>
>>>> # Standard library imports
>>>> import json
>>>> import multiprocessing
>>>> import os
>>>> import re
>>>> import sys
>>>> import random
>>>>
>>>> # Third-party imports
>>>> import numpy as np
>>>> import pandas as pd
>>>> import pyarrow
>>>>
>>>> # Pyspark imports
>>>> from pyspark import SparkConf, SparkContext
>>>> from pyspark.sql import SparkSession, functions as F, Window
>>>> from pyspark.sql.functions import (
>>>>     col, concat, concat_ws, expr, lit, trim, udf
>>>> )
>>>> from pyspark.sql.types import (
>>>>     IntegerType, StringType, StructField, StructType,
>>>>     DoubleType, TimestampType
>>>> )
>>>> from pyspark import pandas as ps
>>>>
>>>> os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
>>>>
>>>> number_cores = int(multiprocessing.cpu_count())
>>>>
>>>> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  # 
>>>> e.g. 4015976448
>>>> memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74
>>>>
>>>>
>>>> def get_spark_session(app_name: str, conf: SparkConf):
>>>>     conf.setMaster("local[{}]".format(number_cores))
>>>>     conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
>>>>         "spark.sql.repl.eagerEval.enabled", "True"
>>>>     ).set("spark.sql.adaptive.enabled", "True").set(
>>>>         "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
>>>>     ).set(
>>>>         "spark.sql.repl.eagerEval.maxNumRows", "10000"
>>>>     ).set(
>>>>         "sc.setLogLevel", "ERROR"
>>>>     )
>>>>
>>>>     return 
>>>> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>>>>
>>>>
>>>> spark = get_spark_session("My super app", SparkConf())
>>>> spark.sparkContext.setLogLevel("ERROR")
>>>>
>>>>
>>>>
>>>> def generate_ip():
>>>>     return ".".join(str(random.randint(0, 255)) for _ in range(4))
>>>>
>>>> def generate_timestamp():
>>>>     return pd.Timestamp(
>>>>         year=random.randint(2021, 2023),
>>>>         month=random.randint(1, 12),
>>>>         day=random.randint(1, 28),
>>>>         hour=random.randint(0, 23),
>>>>         minute=random.randint(0, 59),
>>>>         second=random.randint(0, 59)
>>>>     )
>>>>
>>>> def random_gbps():
>>>>     return random.uniform(0, 10)
>>>>
>>>> # Number of rows
>>>> n = 20
>>>>
>>>> data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), 
>>>> "date_time": generate_timestamp()} for _ in range(n)]
>>>> df = spark.createDataFrame(pd.DataFrame(data))
>>>> df.show()
>>>>
>>>> agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))
>>>>
>>>> windowRank = Window.orderBy(F.col("total_gbps").desc())
>>>> agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))
>>>>
>>>> top_80_ips = agg_df.filter(F.col("rank") <= 0.80)
>>>> result = df.join(top_80_ips, on="incoming_ips", 
>>>> how="inner").select("incoming_ips", "gbps", "date_time")
>>>> result.show()
>>>>
>>>> print(df.count())
>>>> print(result_df.count())
>>>>
>>>>
>>>> +---------------+-------------------+-------------------+
>>>> |   incoming_ips|               gbps|          date_time|
>>>> +---------------+-------------------+-------------------+
>>>> |   66.186.8.130|  5.074283124722104|2022-03-12 05:09:16|
>>>> |  155.45.76.235| 0.6736194760917324|2021-06-19 03:36:28|
>>>> | 237.51.137.200|0.43334812775057685|2022-04-27 08:08:47|
>>>> |    78.4.48.171| 7.5675453578753435|2022-08-21 18:55:48|
>>>> |  241.84.163.17| 3.5681655964070815|2021-01-24 20:39:50|
>>>> |130.255.202.138|  6.066112278135983|2023-07-07 22:26:15|
>>>> | 198.33.206.140| 1.9147905257021836|2023-03-01 04:44:14|
>>>> |  84.183.253.20|  7.707176860385722|2021-08-26 23:24:31|
>>>> |218.163.165.232|  9.458673015973213|2021-02-22 12:13:15|
>>>> |   62.57.20.153| 1.5764916247359229|2021-11-06 12:41:59|
>>>> | 245.24.168.152|0.07452805411698016|2021-06-04 16:14:36|
>>>> | 98.171.202.249|  3.546118349483626|2022-07-05 10:55:26|
>>>> |   210.5.246.85|0.02430730260109759|2022-04-08 17:26:04|
>>>> | 13.236.170.177|   2.41361938344535|2021-08-11 02:19:06|
>>>> |180.140.248.193| 0.9512956363005021|2021-06-27 18:16:58|
>>>> |  26.140.88.127|   7.51335778127692|2023-06-02 14:13:30|
>>>> |  7.118.207.252|  6.450499049816286|2022-12-11 06:36:20|
>>>> |    11.8.10.136|  8.750329246667354|2023-02-03 05:33:16|
>>>> |  232.140.56.86|  4.289740988237201|2023-02-22 20:10:09|
>>>> |   68.117.9.255|  5.384340363304169|2022-12-03 09:55:26|
>>>> +---------------+-------------------+-------------------+
>>>>
>>>> +---------------+------------------+-------------------+
>>>> |   incoming_ips|              gbps|          date_time|
>>>> +---------------+------------------+-------------------+
>>>> |   66.186.8.130| 5.074283124722104|2022-03-12 05:09:16|
>>>> |  241.84.163.17|3.5681655964070815|2021-01-24 20:39:50|
>>>> |    78.4.48.171|7.5675453578753435|2022-08-21 18:55:48|
>>>> |130.255.202.138| 6.066112278135983|2023-07-07 22:26:15|
>>>> | 198.33.206.140|1.9147905257021836|2023-03-01 04:44:14|
>>>> |  84.183.253.20| 7.707176860385722|2021-08-26 23:24:31|
>>>> |218.163.165.232| 9.458673015973213|2021-02-22 12:13:15|
>>>> |   62.57.20.153|1.5764916247359229|2021-11-06 12:41:59|
>>>> | 98.171.202.249| 3.546118349483626|2022-07-05 10:55:26|
>>>> |180.140.248.193|0.9512956363005021|2021-06-27 18:16:58|
>>>> | 13.236.170.177|  2.41361938344535|2021-08-11 02:19:06|
>>>> |  26.140.88.127|  7.51335778127692|2023-06-02 14:13:30|
>>>> |  7.118.207.252| 6.450499049816286|2022-12-11 06:36:20|
>>>> |    11.8.10.136| 8.750329246667354|2023-02-03 05:33:16|
>>>> |  232.140.56.86| 4.289740988237201|2023-02-22 20:10:09|
>>>> |   68.117.9.255| 5.384340363304169|2022-12-03 09:55:26|
>>>> +---------------+------------------+-------------------+
>>>>
>>>> 20
>>>> 16
>>>>
>>>>
>>>>
>>>> fre. 15. sep. 2023 kl. 20:14 skrev ashok34...@yahoo.com.INVALID
>>>> <ashok34...@yahoo.com.invalid>:
>>>>
>>>>> Hi team,
>>>>>
>>>>> I am using PySpark 3.4
>>>>>
>>>>> I have a table of million rows that has few columns. among them
>>>>> incoming ips  and what is known as gbps (Gigabytes per second) and
>>>>> date and time of  incoming ip.
>>>>>
>>>>> I want to filter out 20% of low active ips and work on the remainder
>>>>> of data. How can I do thiis in PySpark?
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Bjørn Jørgensen
>>>> Vestre Aspehaug 4, 6010 Ålesund
>>>> Norge
>>>>
>>>> +47 480 94 297
>>>>
>>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>

Reply via email to