So now I have tried to run this function in a ThreadPool. But it doesn't
seem to work.

[image: image.png]

---------- Forwarded message ---------
Fra: Sean Owen <sro...@gmail.com>
Date: ons. 20. jul. 2022 kl. 22:43
Subject: Re: Pyspark and multiprocessing
To: Bjørn Jørgensen <bjornjorgen...@gmail.com>


I don't think you ever say what doesn't work?

On Wed, Jul 20, 2022 at 3:40 PM Bjørn Jørgensen <bjornjorgen...@gmail.com>
wrote:

> I have 400k of JSON files. Which is between 10 kb and 500 kb in size.
> They don`t have the same schema, so I have to loop over them one at a
> time.
>
> This works, but is`s very slow. This process takes 5 days!
>
> So now I have tried to run this functions in a ThreadPool. But it don`t
> seems to work.
>
>
> *Start local spark. The system have 16 cores and 64 GB.*
>
> number_cores = int(multiprocessing.cpu_count())
>
> mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')  #
> e.g. 4015976448
> memory_gb = int(mem_bytes/(1024.**3))  # e.g. 3.74
>
>
> def get_spark_session(app_name: str, conf: SparkConf):
>     conf.setMaster('local[{}]'.format(number_cores))
>     conf \
>       .set('spark.driver.memory', '{}g'.format(memory_gb)) \
>       .set("spark.sql.repl.eagerEval.enabled", "True") \
>       .set("spark.sql.adaptive.enabled", "True") \
>       .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer") \
>       .set("spark.sql.repl.eagerEval.maxNumRows", "10000")
>
>     return
> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>
> spark = get_spark_session("Falk", SparkConf())
>
>
> *Function to rename columns with \\ *
>
> # We take a dataframe and return a new one with required changes
> def cleanDataFrame(df: DataFrame) -> DataFrame:
>     # Returns a new sanitized field name (this function can be anything
> really)
>     def sanitizeFieldName(s: str) -> str:
>         return s.replace("-", "_").replace("&", "_").replace("\"", "_")\
>             .replace("[", "_").replace("]", "_").replace(".", "_")
>
>     # We call this on all fields to create a copy and to perform any
> changes we might
>     # want to do to the field.
>     def sanitizeField(field: StructField) -> StructField:
>         field = copy(field)
>         field.name = sanitizeFieldName(field.name)
>         # We recursively call cleanSchema on all types
>         field.dataType = cleanSchema(field.dataType)
>         return field
>
>     def cleanSchema(dataType: [DataType]) -> [DateType]:
>         dataType = copy(dataType)
>         # If the type is a StructType we need to recurse otherwise we can
> return since
>         # we've reached the leaf node
>         if isinstance(dataType, StructType):
>             # We call our sanitizer for all top level fields
>             dataType.fields = [sanitizeField(f) for f in dataType.fields]
>         elif isinstance(dataType, ArrayType):
>             dataType.elementType = cleanSchema(dataType.elementType)
>         return dataType
>
>     # Now since we have the new schema we can create a new DataFrame by
> using the old Frame's RDD as data and the new schema as the schema for the
> data
>     return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
>
>
>
> *Function to flatten out a nested dataframe.*
>
>
> from pyspark.sql.types import *
> from pyspark.sql.functions import *
>
>
> def flatten_test(df, sep="_"):
>     """Returns a flattened dataframe.
>         .. versionadded:: x.X.X
>
>         Parameters
>         ----------
>         sep : str
>             Delimiter for flatted columns. Default `_`
>
>         Notes
>         -----
>         Don`t use `.` as `sep`
>         It won't work on nested data frames with more than one level.
>         And you will have to use `columns.name`.
>
>         Flattening Map Types will have to find every key in the column.
>         This can be slow.
>
>         Examples
>         --------
>
>         data_mixed = [
>             {
>                 "state": "Florida",
>                 "shortname": "FL",
>                 "info": {"governor": "Rick Scott"},
>                 "counties": [
>                     {"name": "Dade", "population": 12345},
>                     {"name": "Broward", "population": 40000},
>                     {"name": "Palm Beach", "population": 60000},
>                 ],
>             },
>             {
>                 "state": "Ohio",
>                 "shortname": "OH",
>                 "info": {"governor": "John Kasich"},
>                 "counties": [
>                     {"name": "Summit", "population": 1234},
>                     {"name": "Cuyahoga", "population": 1337},
>                 ],
>             },
>         ]
>
>         data_mixed = spark.createDataFrame(data=data_mixed)
>
>         data_mixed.printSchema()
>
>         root
>         |-- counties: array (nullable = true)
>         |    |-- element: map (containsNull = true)
>         |    |    |-- key: string
>         |    |    |-- value: string (valueContainsNull = true)
>         |-- info: map (nullable = true)
>         |    |-- key: string
>         |    |-- value: string (valueContainsNull = true)
>         |-- shortname: string (nullable = true)
>         |-- state: string (nullable = true)
>
>
>         data_mixed_flat = flatten_test(df, sep=":")
>         data_mixed_flat.printSchema()
>         root
>         |-- shortname: string (nullable = true)
>         |-- state: string (nullable = true)
>         |-- counties:name: string (nullable = true)
>         |-- counties:population: string (nullable = true)
>         |-- info:governor: string (nullable = true)
>
>
>
>
>         data = [
>             {
>                 "id": 1,
>                 "name": "Cole Volk",
>                 "fitness": {"height": 130, "weight": 60},
>             },
>             {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
>             {
>                 "id": 2,
>                 "name": "Faye Raker",
>                 "fitness": {"height": 130, "weight": 60},
>             },
>         ]
>
>
>         df = spark.createDataFrame(data=data)
>
>         df.printSchema()
>
>         root
>         |-- fitness: map (nullable = true)
>         |    |-- key: string
>         |    |-- value: long (valueContainsNull = true)
>         |-- id: long (nullable = true)
>         |-- name: string (nullable = true)
>
>         df_flat = flatten_test(df, sep=":")
>
>         df_flat.printSchema()
>
>         root
>         |-- id: long (nullable = true)
>         |-- name: string (nullable = true)
>         |-- fitness:height: long (nullable = true)
>         |-- fitness:weight: long (nullable = true)
>
>         data_struct = [
>                 (("James",None,"Smith"),"OH","M"),
>                 (("Anna","Rose",""),"NY","F"),
>                 (("Julia","","Williams"),"OH","F"),
>                 (("Maria","Anne","Jones"),"NY","M"),
>                 (("Jen","Mary","Brown"),"NY","M"),
>                 (("Mike","Mary","Williams"),"OH","M")
>                 ]
>
>
>         schema = StructType([
>             StructField('name', StructType([
>                 StructField('firstname', StringType(), True),
>                 StructField('middlename', StringType(), True),
>                 StructField('lastname', StringType(), True)
>                 ])),
>             StructField('state', StringType(), True),
>             StructField('gender', StringType(), True)
>             ])
>
>         df_struct = spark.createDataFrame(data = data_struct, schema =
> schema)
>
>         df_struct.printSchema()
>
>         root
>         |-- name: struct (nullable = true)
>         |    |-- firstname: string (nullable = true)
>         |    |-- middlename: string (nullable = true)
>         |    |-- lastname: string (nullable = true)
>         |-- state: string (nullable = true)
>         |-- gender: string (nullable = true)
>
>         df_struct_flat = flatten_test(df_struct, sep=":")
>
>         df_struct_flat.printSchema()
>
>         root
>         |-- state: string (nullable = true)
>         |-- gender: string (nullable = true)
>         |-- name:firstname: string (nullable = true)
>         |-- name:middlename: string (nullable = true)
>         |-- name:lastname: string (nullable = true)
>         """
>     # compute Complex Fields (Arrays, Structs and Maptypes) in Schema
>     complex_fields = dict([(field.name, field.dataType)
>                             for field in df.schema.fields
>                             if type(field.dataType) == ArrayType
>                             or type(field.dataType) == StructType
>                             or type(field.dataType) == MapType])
>
>     while len(complex_fields) !=0:
>         col_name = list(complex_fields.keys())[0]
>         #print ("Processing :"+col_name+" Type :
> "+str(type(complex_fields[col_name])))
>
>         # if StructType then convert all sub element to columns.
>         # i.e. flatten structs
>         if (type(complex_fields[col_name]) == StructType):
>             expanded = [col(col_name + '.' + k).alias(col_name + sep + k)
>             for k in [n.name for n in complex_fields[col_name]]]
>             df = df.select("*", *expanded).drop(col_name)
>
>         # if ArrayType then add the Array Elements as Rows using the
> explode function
>         # i.e. explode Arrays
>         elif (type(complex_fields[col_name]) == ArrayType):
>             df = df.withColumn(col_name, explode_outer(col_name))
>
>         # if MapType then convert all sub element to columns.
>         # i.e. flatten
>         elif (type(complex_fields[col_name]) == MapType):
>             keys_df =
> df.select(explode_outer(map_keys(col(col_name)))).distinct()
>             keys = list(map(lambda row: row[0], keys_df.collect()))
>             key_cols = list(map(lambda f: col(col_name).getItem(f)
>             .alias(str(col_name + sep + f)), keys))
>             drop_column_list = [col_name]
>             df = df.select([col_name for col_name in df.columns
>             if col_name not in drop_column_list] + key_cols)
>
>         # recompute remaining Complex Fields in Schema
>         complex_fields = dict([(field.name, field.dataType)
>                             for field in df.schema.fields
>                             if type(field.dataType) == ArrayType
>                             or type(field.dataType) == StructType
>                             or type(field.dataType) == MapType])
>
>     return df
>
>
> *Function to read each file, and apply the functions and save each file as
> JSON.*
>
> def json_to_norm_with_null(dir_path, path_to_save):
>     path = dir_path
>
>     for filename in os.listdir(path):
>         if not filename.endswith('._stript_list.json'):
>             continue
>
>
>         fullname = os.path.join(path, filename)
>         with open(fullname) as json_file:
>             jsonstr = json.load(json_file)
>
>         df = spark.read.json(fullname)
>         df = cleanDataFrame(df)
>         df = flatten_test(df, sep=":")
>         df.write.mode('append').option('compression',
> 'snappy').option("ignoreNullFields", "false").json(path_to_save)
>
>
> *Function to start everything of. With hopefully 10 processes.*
>
> from multiprocessing.pool import ThreadPool
> tpool = ThreadPool(processes=10)
>
> tpool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/data/form_version/F02",
> '/home/jovyan/notebooks/falk/F02.json'))
>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Reply via email to