Thank you.
The reason for using spark local is to test the code, and as in this case I
find the bottlenecks and fix them before I spinn up a K8S cluster.

I did test it now with
16 cores and 10 files

import time

tic = time.perf_counter()
json_to_norm_with_null("/home/jovyan/notebooks/falk/test",
'/home/jovyan/notebooks/falk/test/test.json')
toc = time.perf_counter()
print(f"Func run in {toc - tic:0.4f} seconds")

Func run in 30.3695 seconds


then I stop spark and stat it with setMaster('local[1]')

and now

Func run in 30.8168 seconds


Which means that it don`t matter if I run this code on one core or on a K8S
cluster with 100 cores.

So I tested the same with

from multiprocessing.pool import ThreadPool
import multiprocessing as mp


if __name__ == "__main__":
    tic = time.perf_counter()
    pool = ThreadPool(mp.cpu_count())
    opt =
pool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/test",
'/home/jovyan/notebooks/falk/test/test.json'))
    toc = time.perf_counter()
    print(f"Func run in {toc - tic:0.4f} seconds")

I get the same files and they are ok.
But I also get this error

TypeError                                 Traceback (most recent call last)
Input In [33], in <cell line: 5>()      6 tic = time.perf_counter()
  7 pool = ThreadPool(mp.cpu_count())----> 8 opt =
pool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/test",
'/home/jovyan/notebooks/falk/test/test.json'))      9 toc =
time.perf_counter()     10 print(f"Func run in {toc - tic:0.4f}
seconds")


TypeError: Pool.map() missing 1 required positional argument: 'iterable'

So any hints on what to change? :)

Spark has the pandas on spark API, and that is realy great. I prefer pandas
on spark API and pyspark over pandas.

tor. 21. jul. 2022 kl. 09:18 skrev Khalid Mammadov <
khalidmammad...@gmail.com>:

> One quick observation is that you allocate all your local CPUs to Spark
> then execute that app with 10 Threads i.e 10 spark apps and so you will
> need 160cores in total as each will need 16CPUs IMHO. Wouldn't that create
> CPU bottleneck?
>
> Also on the side note, why you need Spark if you use that on local only?
> Sparks power can only be (mainly) observed in a cluster env.
> I have achieved great parallelism using pandas and pools on a local
> machine in the past.
>
>
> On Wed, 20 Jul 2022, 21:39 Bjørn Jørgensen, <bjornjorgen...@gmail.com>
> wrote:
>
>> I have 400k of JSON files. Which is between 10 kb and 500 kb in size.
>> They don`t have the same schema, so I have to loop over them one at a
>> time.
>>
>> This works, but is`s very slow. This process takes 5 days!
>>
>> So now I have tried to run this functions in a ThreadPool. But it don`t
>> seems to work.
>>
>>
>> *Start local spark. The system have 16 cores and 64 GB.*
>>
>> number_cores = int(multiprocessing.cpu_count())
>>
>> mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')  #
>> e.g. 4015976448
>> memory_gb = int(mem_bytes/(1024.**3))  # e.g. 3.74
>>
>>
>> def get_spark_session(app_name: str, conf: SparkConf):
>>     conf.setMaster('local[{}]'.format(number_cores))
>>     conf \
>>       .set('spark.driver.memory', '{}g'.format(memory_gb)) \
>>       .set("spark.sql.repl.eagerEval.enabled", "True") \
>>       .set("spark.sql.adaptive.enabled", "True") \
>>       .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer") \
>>       .set("spark.sql.repl.eagerEval.maxNumRows", "10000")
>>
>>     return
>> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>>
>> spark = get_spark_session("Falk", SparkConf())
>>
>>
>> *Function to rename columns with \\ *
>>
>> # We take a dataframe and return a new one with required changes
>> def cleanDataFrame(df: DataFrame) -> DataFrame:
>>     # Returns a new sanitized field name (this function can be anything
>> really)
>>     def sanitizeFieldName(s: str) -> str:
>>         return s.replace("-", "_").replace("&", "_").replace("\"", "_")\
>>             .replace("[", "_").replace("]", "_").replace(".", "_")
>>
>>     # We call this on all fields to create a copy and to perform any
>> changes we might
>>     # want to do to the field.
>>     def sanitizeField(field: StructField) -> StructField:
>>         field = copy(field)
>>         field.name = sanitizeFieldName(field.name)
>>         # We recursively call cleanSchema on all types
>>         field.dataType = cleanSchema(field.dataType)
>>         return field
>>
>>     def cleanSchema(dataType: [DataType]) -> [DateType]:
>>         dataType = copy(dataType)
>>         # If the type is a StructType we need to recurse otherwise we can
>> return since
>>         # we've reached the leaf node
>>         if isinstance(dataType, StructType):
>>             # We call our sanitizer for all top level fields
>>             dataType.fields = [sanitizeField(f) for f in dataType.fields]
>>         elif isinstance(dataType, ArrayType):
>>             dataType.elementType = cleanSchema(dataType.elementType)
>>         return dataType
>>
>>     # Now since we have the new schema we can create a new DataFrame by
>> using the old Frame's RDD as data and the new schema as the schema for the
>> data
>>     return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
>>
>>
>>
>> *Function to flatten out a nested dataframe.*
>>
>>
>> from pyspark.sql.types import *
>> from pyspark.sql.functions import *
>>
>>
>> def flatten_test(df, sep="_"):
>>     """Returns a flattened dataframe.
>>         .. versionadded:: x.X.X
>>
>>         Parameters
>>         ----------
>>         sep : str
>>             Delimiter for flatted columns. Default `_`
>>
>>         Notes
>>         -----
>>         Don`t use `.` as `sep`
>>         It won't work on nested data frames with more than one level.
>>         And you will have to use `columns.name`.
>>
>>         Flattening Map Types will have to find every key in the column.
>>         This can be slow.
>>
>>         Examples
>>         --------
>>
>>         data_mixed = [
>>             {
>>                 "state": "Florida",
>>                 "shortname": "FL",
>>                 "info": {"governor": "Rick Scott"},
>>                 "counties": [
>>                     {"name": "Dade", "population": 12345},
>>                     {"name": "Broward", "population": 40000},
>>                     {"name": "Palm Beach", "population": 60000},
>>                 ],
>>             },
>>             {
>>                 "state": "Ohio",
>>                 "shortname": "OH",
>>                 "info": {"governor": "John Kasich"},
>>                 "counties": [
>>                     {"name": "Summit", "population": 1234},
>>                     {"name": "Cuyahoga", "population": 1337},
>>                 ],
>>             },
>>         ]
>>
>>         data_mixed = spark.createDataFrame(data=data_mixed)
>>
>>         data_mixed.printSchema()
>>
>>         root
>>         |-- counties: array (nullable = true)
>>         |    |-- element: map (containsNull = true)
>>         |    |    |-- key: string
>>         |    |    |-- value: string (valueContainsNull = true)
>>         |-- info: map (nullable = true)
>>         |    |-- key: string
>>         |    |-- value: string (valueContainsNull = true)
>>         |-- shortname: string (nullable = true)
>>         |-- state: string (nullable = true)
>>
>>
>>         data_mixed_flat = flatten_test(df, sep=":")
>>         data_mixed_flat.printSchema()
>>         root
>>         |-- shortname: string (nullable = true)
>>         |-- state: string (nullable = true)
>>         |-- counties:name: string (nullable = true)
>>         |-- counties:population: string (nullable = true)
>>         |-- info:governor: string (nullable = true)
>>
>>
>>
>>
>>         data = [
>>             {
>>                 "id": 1,
>>                 "name": "Cole Volk",
>>                 "fitness": {"height": 130, "weight": 60},
>>             },
>>             {"name": "Mark Reg", "fitness": {"height": 130, "weight":
>> 60}},
>>             {
>>                 "id": 2,
>>                 "name": "Faye Raker",
>>                 "fitness": {"height": 130, "weight": 60},
>>             },
>>         ]
>>
>>
>>         df = spark.createDataFrame(data=data)
>>
>>         df.printSchema()
>>
>>         root
>>         |-- fitness: map (nullable = true)
>>         |    |-- key: string
>>         |    |-- value: long (valueContainsNull = true)
>>         |-- id: long (nullable = true)
>>         |-- name: string (nullable = true)
>>
>>         df_flat = flatten_test(df, sep=":")
>>
>>         df_flat.printSchema()
>>
>>         root
>>         |-- id: long (nullable = true)
>>         |-- name: string (nullable = true)
>>         |-- fitness:height: long (nullable = true)
>>         |-- fitness:weight: long (nullable = true)
>>
>>         data_struct = [
>>                 (("James",None,"Smith"),"OH","M"),
>>                 (("Anna","Rose",""),"NY","F"),
>>                 (("Julia","","Williams"),"OH","F"),
>>                 (("Maria","Anne","Jones"),"NY","M"),
>>                 (("Jen","Mary","Brown"),"NY","M"),
>>                 (("Mike","Mary","Williams"),"OH","M")
>>                 ]
>>
>>
>>         schema = StructType([
>>             StructField('name', StructType([
>>                 StructField('firstname', StringType(), True),
>>                 StructField('middlename', StringType(), True),
>>                 StructField('lastname', StringType(), True)
>>                 ])),
>>             StructField('state', StringType(), True),
>>             StructField('gender', StringType(), True)
>>             ])
>>
>>         df_struct = spark.createDataFrame(data = data_struct, schema =
>> schema)
>>
>>         df_struct.printSchema()
>>
>>         root
>>         |-- name: struct (nullable = true)
>>         |    |-- firstname: string (nullable = true)
>>         |    |-- middlename: string (nullable = true)
>>         |    |-- lastname: string (nullable = true)
>>         |-- state: string (nullable = true)
>>         |-- gender: string (nullable = true)
>>
>>         df_struct_flat = flatten_test(df_struct, sep=":")
>>
>>         df_struct_flat.printSchema()
>>
>>         root
>>         |-- state: string (nullable = true)
>>         |-- gender: string (nullable = true)
>>         |-- name:firstname: string (nullable = true)
>>         |-- name:middlename: string (nullable = true)
>>         |-- name:lastname: string (nullable = true)
>>         """
>>     # compute Complex Fields (Arrays, Structs and Maptypes) in Schema
>>     complex_fields = dict([(field.name, field.dataType)
>>                             for field in df.schema.fields
>>                             if type(field.dataType) == ArrayType
>>                             or type(field.dataType) == StructType
>>                             or type(field.dataType) == MapType])
>>
>>     while len(complex_fields) !=0:
>>         col_name = list(complex_fields.keys())[0]
>>         #print ("Processing :"+col_name+" Type :
>> "+str(type(complex_fields[col_name])))
>>
>>         # if StructType then convert all sub element to columns.
>>         # i.e. flatten structs
>>         if (type(complex_fields[col_name]) == StructType):
>>             expanded = [col(col_name + '.' + k).alias(col_name + sep + k)
>>             for k in [n.name for n in complex_fields[col_name]]]
>>             df = df.select("*", *expanded).drop(col_name)
>>
>>         # if ArrayType then add the Array Elements as Rows using the
>> explode function
>>         # i.e. explode Arrays
>>         elif (type(complex_fields[col_name]) == ArrayType):
>>             df = df.withColumn(col_name, explode_outer(col_name))
>>
>>         # if MapType then convert all sub element to columns.
>>         # i.e. flatten
>>         elif (type(complex_fields[col_name]) == MapType):
>>             keys_df =
>> df.select(explode_outer(map_keys(col(col_name)))).distinct()
>>             keys = list(map(lambda row: row[0], keys_df.collect()))
>>             key_cols = list(map(lambda f: col(col_name).getItem(f)
>>             .alias(str(col_name + sep + f)), keys))
>>             drop_column_list = [col_name]
>>             df = df.select([col_name for col_name in df.columns
>>             if col_name not in drop_column_list] + key_cols)
>>
>>         # recompute remaining Complex Fields in Schema
>>         complex_fields = dict([(field.name, field.dataType)
>>                             for field in df.schema.fields
>>                             if type(field.dataType) == ArrayType
>>                             or type(field.dataType) == StructType
>>                             or type(field.dataType) == MapType])
>>
>>     return df
>>
>>
>> *Function to read each file, and apply the functions and save each file
>> as JSON.*
>>
>> def json_to_norm_with_null(dir_path, path_to_save):
>>     path = dir_path
>>
>>     for filename in os.listdir(path):
>>         if not filename.endswith('._stript_list.json'):
>>             continue
>>
>>
>>         fullname = os.path.join(path, filename)
>>         with open(fullname) as json_file:
>>             jsonstr = json.load(json_file)
>>
>>         df = spark.read.json(fullname)
>>         df = cleanDataFrame(df)
>>         df = flatten_test(df, sep=":")
>>         df.write.mode('append').option('compression',
>> 'snappy').option("ignoreNullFields", "false").json(path_to_save)
>>
>>
>> *Function to start everything of. With hopefully 10 processes.*
>>
>> from multiprocessing.pool import ThreadPool
>> tpool = ThreadPool(processes=10)
>>
>> tpool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/data/form_version/F02",
>> '/home/jovyan/notebooks/falk/F02.json'))
>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Reply via email to