In [1]:
# Standard library imports
import json
import multiprocessing
import os
import re
import sys
import random
import time

# Third-party imports
import numpy as np
import pandas as pd
import pyarrow

# Pyspark imports
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.functions import (
    col, concat, concat_ws, expr, lit, trim, udf
)
from pyspark.sql.types import (
    IntegerType, StringType, StructField, StructType,
    DoubleType, TimestampType
)
from pyspark import pandas as ps

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

number_cores = int(multiprocessing.cpu_count())

mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  # e.g. 4015976448
memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74


def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("local[{}]".format(number_cores))
    conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
        "spark.sql.repl.eagerEval.enabled", "True"
    ).set("spark.sql.adaptive.enabled", "True").set(
        "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
    ).set(
        "spark.sql.repl.eagerEval.maxNumRows", "10000"
    ).set(
        "sc.setLogLevel", "ERROR"
    )

    return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()


spark = get_spark_session("Falk part 2", SparkConf())
spark.sparkContext.setLogLevel("ERROR")



In [2]:
spark

In [3]:
def generate_ip():
    return ".".join(str(random.randint(0, 255)) for _ in range(4))

def generate_timestamp():
    return pd.Timestamp(
        year=random.randint(2021, 2023),
        month=random.randint(1, 12),
        day=random.randint(1, 28),
        hour=random.randint(0, 23),
        minute=random.randint(0, 59),
        second=random.randint(0, 59)
    )

def random_gbps():
    return random.uniform(0, 10)

# Number of rows
n = 10000000

data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), "date_time": generate_timestamp()} for _ in range(n)]
df = spark.createDataFrame(pd.DataFrame(data))
df.show()

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


+---------------+------------------+-------------------+
|   incoming_ips|              gbps|          date_time|
+---------------+------------------+-------------------+
| 137.199.37.150| 6.820576686325688|2023-07-27 20:59:04|
|230.110.153.179| 4.882867739649452|2023-09-07 10:50:50|
| 176.169.113.43| 8.581651352042446|2023-01-18 22:00:26|
| 114.221.38.102| 9.080382058222966|2023-07-19 06:30:36|
|137.246.151.241|  2.15680432795039|2022-06-07 21:32:48|
| 138.193.210.91|1.9009988581050907|2023-08-07 05:05:09|
|  102.75.86.166|1.0749379695784234|2021-08-02 02:53:25|
|   26.123.31.25| 5.011214967454533|2023-09-03 23:55:00|
|  29.193.71.100|6.8762810023110585|2023-06-03 21:20:13|
|   117.64.154.2| 5.329844690343577|2021-06-19 05:52:12|
|  242.39.179.44| 6.629436432468285|2022-06-16 18:16:16|
| 157.60.148.200| 4.488652913930523|2021-02-01 04:34:39|
|   45.126.78.17|  5.73893095647504|2023-10-10 17:05:10|
|  197.33.71.246| 9.782816044850676|2023-12-04 03:57:47|
| 45.196.174.141|2.848304652008

In [4]:
start_time = time.time()

agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))

windowRank = Window.orderBy(F.col("total_gbps").desc())
agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))

top_80_ips = agg_df.filter(F.col("rank") <= 0.80)
result = df.join(top_80_ips, on="incoming_ips", how="inner").select("incoming_ips", "gbps", "date_time")
result.show()
end_time = time.time()

print(df.count())
print(result.count())

print(f"Time taken: {end_time - start_time:.2f} seconds")

+------------+------------------+-------------------+
|incoming_ips|              gbps|          date_time|
+------------+------------------+-------------------+
|  0.0.10.199| 5.974422647561441|2021-08-08 08:12:30|
|  0.0.124.55|3.7892126749644017|2023-12-10 09:32:39|
| 0.0.174.185| 6.583322997642929|2021-06-12 07:44:39|
|  0.0.182.19| 7.240995280479487|2022-07-19 11:13:42|
| 0.0.207.180|7.6495623077017525|2021-06-18 11:15:39|
|  0.0.28.135| 9.139480449162802|2022-05-12 18:56:16|
|    0.0.4.51| 2.714399276005249|2023-07-06 00:58:26|
|  0.0.58.239| 9.019685932727338|2022-09-07 11:19:08|
|  0.1.125.40| 7.942284045976054|2022-06-04 16:35:22|
|  0.1.127.36| 5.037641395398427|2022-06-11 22:37:16|
|  0.1.127.36| 7.073916375903116|2022-11-16 13:23:54|
|  0.1.146.47|  8.37713893251334|2021-03-24 07:34:29|
| 0.1.178.188| 5.773007166692134|2023-11-13 12:06:40|
| 0.1.192.128| 7.377951825968085|2022-04-20 06:26:33|
| 0.1.199.247|7.2243833870840355|2022-09-23 05:41:03|
|  0.1.20.207| 4.42394156785

In [5]:
pip install psutil

Note: you may need to restart the kernel to use updated packages.


In [6]:
import os
import platform
import sys
import psutil


# OS and Python version
print("OS:", platform.system(), platform.release())
print("Python version:", sys.version)

# CPU information
print("CPU cores (logical):", psutil.cpu_count(logical=True))
print("CPU cores (physical):", psutil.cpu_count(logical=False))
print("CPU frequency:", psutil.cpu_freq().current, "MHz")

# RAM information
virtual_memory = psutil.virtual_memory()
print("Total RAM:", virtual_memory.total / (1024**3), "GB")
print("Available RAM:", virtual_memory.available / (1024**3), "GB")
print("Used RAM:", virtual_memory.used / (1024**3), "GB")


OS: Linux 6.5.1-1-MANJARO
Python version: 3.11.5 | packaged by conda-forge | (main, Aug 27 2023, 03:34:09) [GCC 12.3.0]
CPU cores (logical): 16
CPU cores (physical): 8
CPU frequency: 1641.1173125000003 MHz
Total RAM: 62.1053466796875 GB
Available RAM: 41.11076354980469 GB
Used RAM: 20.30352020263672 GB
