So now I have tried to run this function in a ThreadPool. But it doesn't seem to work.
[image: image.png] ---------- Forwarded message --------- Fra: Sean Owen <sro...@gmail.com> Date: ons. 20. jul. 2022 kl. 22:43 Subject: Re: Pyspark and multiprocessing To: Bjørn Jørgensen <bjornjorgen...@gmail.com> I don't think you ever say what doesn't work? On Wed, Jul 20, 2022 at 3:40 PM Bjørn Jørgensen <bjornjorgen...@gmail.com> wrote: > I have 400k of JSON files. Which is between 10 kb and 500 kb in size. > They don`t have the same schema, so I have to loop over them one at a > time. > > This works, but is`s very slow. This process takes 5 days! > > So now I have tried to run this functions in a ThreadPool. But it don`t > seems to work. > > > *Start local spark. The system have 16 cores and 64 GB.* > > number_cores = int(multiprocessing.cpu_count()) > > mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') # > e.g. 4015976448 > memory_gb = int(mem_bytes/(1024.**3)) # e.g. 3.74 > > > def get_spark_session(app_name: str, conf: SparkConf): > conf.setMaster('local[{}]'.format(number_cores)) > conf \ > .set('spark.driver.memory', '{}g'.format(memory_gb)) \ > .set("spark.sql.repl.eagerEval.enabled", "True") \ > .set("spark.sql.adaptive.enabled", "True") \ > .set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") \ > .set("spark.sql.repl.eagerEval.maxNumRows", "10000") > > return > SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate() > > spark = get_spark_session("Falk", SparkConf()) > > > *Function to rename columns with \\ * > > # We take a dataframe and return a new one with required changes > def cleanDataFrame(df: DataFrame) -> DataFrame: > # Returns a new sanitized field name (this function can be anything > really) > def sanitizeFieldName(s: str) -> str: > return s.replace("-", "_").replace("&", "_").replace("\"", "_")\ > .replace("[", "_").replace("]", "_").replace(".", "_") > > # We call this on all fields to create a copy and to perform any > changes we might > # want to do to the field. > def sanitizeField(field: StructField) -> StructField: > field = copy(field) > field.name = sanitizeFieldName(field.name) > # We recursively call cleanSchema on all types > field.dataType = cleanSchema(field.dataType) > return field > > def cleanSchema(dataType: [DataType]) -> [DateType]: > dataType = copy(dataType) > # If the type is a StructType we need to recurse otherwise we can > return since > # we've reached the leaf node > if isinstance(dataType, StructType): > # We call our sanitizer for all top level fields > dataType.fields = [sanitizeField(f) for f in dataType.fields] > elif isinstance(dataType, ArrayType): > dataType.elementType = cleanSchema(dataType.elementType) > return dataType > > # Now since we have the new schema we can create a new DataFrame by > using the old Frame's RDD as data and the new schema as the schema for the > data > return spark.createDataFrame(df.rdd, cleanSchema(df.schema)) > > > > *Function to flatten out a nested dataframe.* > > > from pyspark.sql.types import * > from pyspark.sql.functions import * > > > def flatten_test(df, sep="_"): > """Returns a flattened dataframe. > .. versionadded:: x.X.X > > Parameters > ---------- > sep : str > Delimiter for flatted columns. Default `_` > > Notes > ----- > Don`t use `.` as `sep` > It won't work on nested data frames with more than one level. > And you will have to use `columns.name`. > > Flattening Map Types will have to find every key in the column. > This can be slow. > > Examples > -------- > > data_mixed = [ > { > "state": "Florida", > "shortname": "FL", > "info": {"governor": "Rick Scott"}, > "counties": [ > {"name": "Dade", "population": 12345}, > {"name": "Broward", "population": 40000}, > {"name": "Palm Beach", "population": 60000}, > ], > }, > { > "state": "Ohio", > "shortname": "OH", > "info": {"governor": "John Kasich"}, > "counties": [ > {"name": "Summit", "population": 1234}, > {"name": "Cuyahoga", "population": 1337}, > ], > }, > ] > > data_mixed = spark.createDataFrame(data=data_mixed) > > data_mixed.printSchema() > > root > |-- counties: array (nullable = true) > | |-- element: map (containsNull = true) > | | |-- key: string > | | |-- value: string (valueContainsNull = true) > |-- info: map (nullable = true) > | |-- key: string > | |-- value: string (valueContainsNull = true) > |-- shortname: string (nullable = true) > |-- state: string (nullable = true) > > > data_mixed_flat = flatten_test(df, sep=":") > data_mixed_flat.printSchema() > root > |-- shortname: string (nullable = true) > |-- state: string (nullable = true) > |-- counties:name: string (nullable = true) > |-- counties:population: string (nullable = true) > |-- info:governor: string (nullable = true) > > > > > data = [ > { > "id": 1, > "name": "Cole Volk", > "fitness": {"height": 130, "weight": 60}, > }, > {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}}, > { > "id": 2, > "name": "Faye Raker", > "fitness": {"height": 130, "weight": 60}, > }, > ] > > > df = spark.createDataFrame(data=data) > > df.printSchema() > > root > |-- fitness: map (nullable = true) > | |-- key: string > | |-- value: long (valueContainsNull = true) > |-- id: long (nullable = true) > |-- name: string (nullable = true) > > df_flat = flatten_test(df, sep=":") > > df_flat.printSchema() > > root > |-- id: long (nullable = true) > |-- name: string (nullable = true) > |-- fitness:height: long (nullable = true) > |-- fitness:weight: long (nullable = true) > > data_struct = [ > (("James",None,"Smith"),"OH","M"), > (("Anna","Rose",""),"NY","F"), > (("Julia","","Williams"),"OH","F"), > (("Maria","Anne","Jones"),"NY","M"), > (("Jen","Mary","Brown"),"NY","M"), > (("Mike","Mary","Williams"),"OH","M") > ] > > > schema = StructType([ > StructField('name', StructType([ > StructField('firstname', StringType(), True), > StructField('middlename', StringType(), True), > StructField('lastname', StringType(), True) > ])), > StructField('state', StringType(), True), > StructField('gender', StringType(), True) > ]) > > df_struct = spark.createDataFrame(data = data_struct, schema = > schema) > > df_struct.printSchema() > > root > |-- name: struct (nullable = true) > | |-- firstname: string (nullable = true) > | |-- middlename: string (nullable = true) > | |-- lastname: string (nullable = true) > |-- state: string (nullable = true) > |-- gender: string (nullable = true) > > df_struct_flat = flatten_test(df_struct, sep=":") > > df_struct_flat.printSchema() > > root > |-- state: string (nullable = true) > |-- gender: string (nullable = true) > |-- name:firstname: string (nullable = true) > |-- name:middlename: string (nullable = true) > |-- name:lastname: string (nullable = true) > """ > # compute Complex Fields (Arrays, Structs and Maptypes) in Schema > complex_fields = dict([(field.name, field.dataType) > for field in df.schema.fields > if type(field.dataType) == ArrayType > or type(field.dataType) == StructType > or type(field.dataType) == MapType]) > > while len(complex_fields) !=0: > col_name = list(complex_fields.keys())[0] > #print ("Processing :"+col_name+" Type : > "+str(type(complex_fields[col_name]))) > > # if StructType then convert all sub element to columns. > # i.e. flatten structs > if (type(complex_fields[col_name]) == StructType): > expanded = [col(col_name + '.' + k).alias(col_name + sep + k) > for k in [n.name for n in complex_fields[col_name]]] > df = df.select("*", *expanded).drop(col_name) > > # if ArrayType then add the Array Elements as Rows using the > explode function > # i.e. explode Arrays > elif (type(complex_fields[col_name]) == ArrayType): > df = df.withColumn(col_name, explode_outer(col_name)) > > # if MapType then convert all sub element to columns. > # i.e. flatten > elif (type(complex_fields[col_name]) == MapType): > keys_df = > df.select(explode_outer(map_keys(col(col_name)))).distinct() > keys = list(map(lambda row: row[0], keys_df.collect())) > key_cols = list(map(lambda f: col(col_name).getItem(f) > .alias(str(col_name + sep + f)), keys)) > drop_column_list = [col_name] > df = df.select([col_name for col_name in df.columns > if col_name not in drop_column_list] + key_cols) > > # recompute remaining Complex Fields in Schema > complex_fields = dict([(field.name, field.dataType) > for field in df.schema.fields > if type(field.dataType) == ArrayType > or type(field.dataType) == StructType > or type(field.dataType) == MapType]) > > return df > > > *Function to read each file, and apply the functions and save each file as > JSON.* > > def json_to_norm_with_null(dir_path, path_to_save): > path = dir_path > > for filename in os.listdir(path): > if not filename.endswith('._stript_list.json'): > continue > > > fullname = os.path.join(path, filename) > with open(fullname) as json_file: > jsonstr = json.load(json_file) > > df = spark.read.json(fullname) > df = cleanDataFrame(df) > df = flatten_test(df, sep=":") > df.write.mode('append').option('compression', > 'snappy').option("ignoreNullFields", "false").json(path_to_save) > > > *Function to start everything of. With hopefully 10 processes.* > > from multiprocessing.pool import ThreadPool > tpool = ThreadPool(processes=10) > > tpool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/data/form_version/F02", > '/home/jovyan/notebooks/falk/F02.json')) > > > -- > Bjørn Jørgensen > Vestre Aspehaug 4, 6010 Ålesund > Norge > > +47 480 94 297 > -- Bjørn Jørgensen Vestre Aspehaug 4, 6010 Ålesund Norge +47 480 94 297