Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-20 Thread Xavier Gervilla
Thank you for the flatten function, it has a bigger functionality than what I 
need for my project but the examples (which were really, really useful) helped 
me find a solution.

Instead of accessing the confidence and entity attributes (metadata.confidence 
and metadata.entity) I was accessing by metadata.value, which instead of 
returning an error gave null values. In addition, the confidence value (a 
number) has StringType so before calculating the average I had to convert it to 

The output generated now is the same as with Spark but using only around 8.5GB 
of RAM so there's no longer a memory error!

Thank you again for all your help!

 Activado Wed, 20 Apr 2022 14:24:46 +0200 Bjørn Jørgensen 

Glad to hear that it works :) 

Your dataframe is nested with both map, array and struct. 

I`m using this function to flatten a nested dataframe to rows and columns.  

from pyspark.sql.types import *
from pyspark.sql.functions import *

def flatten_test(df, sep="_"):
    """Returns a flattened dataframe.
        .. versionadded:: x.X.X
        sep : str
            Delimiter for flatted columns. Default `_`
        Don`t use `.` as `sep`
        It won't work on nested data frames with more than one level.
        And you will have to use ``. 
        Flattening Map Types will have to find every key in the column. 
        This can be slow.

        data_mixed = [
                "state": "Florida",
                "shortname": "FL",
                "info": {"governor": "Rick Scott"},
                "counties": [
                    {"name": "Dade", "population": 12345},
                    {"name": "Broward", "population": 4},
                    {"name": "Palm Beach", "population": 6},
                "state": "Ohio",
                "shortname": "OH",
                "info": {"governor": "John Kasich"},
                "counties": [
                    {"name": "Summit", "population": 1234},
                    {"name": "Cuyahoga", "population": 1337},

        data_mixed = spark.createDataFrame(data=data_mixed)


        |-- counties: array (nullable = true)
        |    |-- element: map (containsNull = true)
        |    |    |-- key: string
        |    |    |-- value: string (valueContainsNull = true)
        |-- info: map (nullable = true)
        |    |-- key: string
        |    |-- value: string (valueContainsNull = true)
        |-- shortname: string (nullable = true)
        |-- state: string (nullable = true)
        data_mixed_flat = flatten_test(df, sep=":")
        |-- shortname: string (nullable = true)
        |-- state: string (nullable = true)
        |-- counties:name: string (nullable = true)
        |-- counties:population: string (nullable = true)
        |-- info:governor: string (nullable = true)

        data = [
                "id": 1,
                "name": "Cole Volk",
                "fitness": {"height": 130, "weight": 60},
            {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
                "id": 2,
                "name": "Faye Raker",
                "fitness": {"height": 130, "weight": 60},

        df = spark.createDataFrame(data=data)


        |-- fitness: map (nullable = true)
        |    |-- key: string
        |    |-- value: long (valueContainsNull = true)
        |-- id: long (nullable = true)
        |-- name: string (nullable = true)
        df_flat = flatten_test(df, sep=":")


        |-- id: long (nullable = true)
        |-- name: string (nullable = true)
        |-- fitness:height: long (nullable = true)
        |-- fitness:weight: long (nullable = true)
        data_struct = [

        schema = StructType([
            StructField('name', StructType([
                StructField('firstname', StringType(), True),
                StructField('middlename', StringType(), True),
                StructField('lastname', StringType(), True)
            StructField('state', StringType(), True),
            StructField('gender', StringType(), True)


Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-20 Thread Bjørn Jørgensen
Glad to hear that it works :)

Your dataframe is nested with both map, array and struct.

I`m using this function to flatten a nested dataframe to rows and columns.

from pyspark.sql.types import *
from pyspark.sql.functions import *

def flatten_test(df, sep="_"):
"""Returns a flattened dataframe.
.. versionadded:: x.X.X

sep : str
Delimiter for flatted columns. Default `_`

Don`t use `.` as `sep`
It won't work on nested data frames with more than one level.
And you will have to use ``.

Flattening Map Types will have to find every key in the column.
This can be slow.


data_mixed = [
"state": "Florida",
"shortname": "FL",
"info": {"governor": "Rick Scott"},
"counties": [
{"name": "Dade", "population": 12345},
{"name": "Broward", "population": 4},
{"name": "Palm Beach", "population": 6},
"state": "Ohio",
"shortname": "OH",
"info": {"governor": "John Kasich"},
"counties": [
{"name": "Summit", "population": 1234},
{"name": "Cuyahoga", "population": 1337},

data_mixed = spark.createDataFrame(data=data_mixed)


|-- counties: array (nullable = true)
||-- element: map (containsNull = true)
|||-- key: string
|||-- value: string (valueContainsNull = true)
|-- info: map (nullable = true)
||-- key: string
||-- value: string (valueContainsNull = true)
|-- shortname: string (nullable = true)
|-- state: string (nullable = true)

data_mixed_flat = flatten_test(df, sep=":")
|-- shortname: string (nullable = true)
|-- state: string (nullable = true)
|-- counties:name: string (nullable = true)
|-- counties:population: string (nullable = true)
|-- info:governor: string (nullable = true)

data = [
"id": 1,
"name": "Cole Volk",
"fitness": {"height": 130, "weight": 60},
{"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
"id": 2,
"name": "Faye Raker",
"fitness": {"height": 130, "weight": 60},

df = spark.createDataFrame(data=data)


|-- fitness: map (nullable = true)
||-- key: string
||-- value: long (valueContainsNull = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)

df_flat = flatten_test(df, sep=":")


|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- fitness:height: long (nullable = true)
|-- fitness:weight: long (nullable = true)

data_struct = [

schema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
StructField('state', StringType(), True),
StructField('gender', StringType(), True)

df_struct = spark.createDataFrame(data = data_struct, schema =


|-- name: struct (nullable = true)
||-- firstname: string (nullable = true)
||-- middlename: string (nullable = true)
||-- lastname: string (nullable = true)
|-- state: string (nullable = true)
|-- gender: string (nullable = true)

df_struct_flat = flatten_test(df_struct, sep=":")


|-- state: string (nullable = true)
|-- gender: string (nullable = true)
|-- name:firstname: string (nullable = true)
|-- name:middlename: string (nullable = true)
|-- name:lastname: string (nullable = true)
# compute Complex Fields (Arrays, Structs and Map Types) in Schema

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-19 Thread Bjørn Jørgensen

*change spark = sparknlp.start()*
spark = sparknlp.start(spark32=True)

tir. 19. apr. 2022 kl. 21:10 skrev Bjørn Jørgensen :

> Yes, there are some that have that issue.
> Please open a new issue at
> and they will help you.
> tir. 19. apr. 2022 kl. 20:33 skrev Xavier Gervilla <
>> Thank you for your advice, I had small knowledge of Spark NLP and I
>> thought it was only possible to use with models that required training and
>> therefore my project wasn’t the case. I'm trying now to build the project
>> again with SparkNLP but when I try to load a pretrained model from
>> JohnSnowLabs I get an error (*py4j.protocol.Py4JJavaError: An error
>> occurred while calling
>> z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.getDownloadSize.*
>> ).
>> This is the new basic code to develop the project again:
>> *spark = sparknlp.start()*
>> *pipelineName = 'analyze_sentiment'*
>> *pipeline = PretrainedPipeline(pipelineName, 'en') #this is the line that
>> generates the error*
>> *rawTweets = spark.readStream.format('socket').option('host',
>> 'localhost').option('port',9008).load()*
>> *allTweets = rawTweets.selectExpr('CAST(value AS
>> STRING)').withColumnRenamed('value', 'text').dropDuplicates('text')*
>> *sentPred = pipeline.transform(allTweets)*
>> *query =
>> sentPred.writeStream.outputMode('complete').format('console').start()*
>> *query.awaitTermination()*
>> Spark version is 3.2.1 and SparkNLP version is 3.4.3, while Java version
>> is 8. I've tried with a different model but the error is still the same, so
>> what could be causing it?
>> If this error is solved I think SparkNLP will be the solution I was
>> looking for to reduce memory consumption so thank you again for suggesting
>> it.
>> El 18 abr 2022, a las 21:07, Bjørn Jørgensen 
>> escribió:
>> When did SpaCy have support for Spark?
>> Try Spark NLP  it`s made for spark. They
>> have a lot of notebooks at and
>> they public user guides at
>> man. 18. apr. 2022 kl. 16:17 skrev Sean Owen :
>> It looks good, are you sure it even starts? the problem I see is that you
>> send a copy of the model from the driver for every task. Try broadcasting
>> the model instead. I'm not sure if that resolves it but would be a good
>> practice.
>> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
>>> wrote:
>> Hi Team,
>> I'm developing a project that retrieves tweets on a 'host' app, streams
>> them to Spark and with different operations with DataFrames obtains the
>> Sentiment of the tweets and their entities applying a Sentiment model and a
>> NER model respectively.
>> The problem I've come across is that when applying the NER model, the RAM
>> consumption increases until the program stops with a memory error because
>> there's no memory left to execute. In addition, on SparkUI I've seen that
>> there's only one executor running, the executor driver, but using htop on
>> the terminal I see that the 8 cores of the instance are executing at 100%.
>> The SparkSession is only configured to receive the tweets from the socket
>> that connects with the second program that sends the tweets. The DataFrame
>> goes through some processing to obtain other properties of the tweet like
>> its sentiment (which causes no error even with less than 8GB of RAM) and
>> then the NER is applied.
>> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
>> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
>> **"localhost"**).option(**"port"**,**9008**).load()
>> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>> **#prior processing of the tweets**
>> sentDF = other_processing(tweets)
>> **#obtaining the column that contains the list of entities from a tweet**
>> nerDF = ner_classification(sentDF)*
>> This is the code of the functions related to obtaining the NER, the "main
>> call" and the UDF function.
>> *nerModel = spacy.load(**"en_core_web_sm"**)
>> **#main call, applies the UDF function to every tweet from the "tweet" 
>> column**def* *ner_classification**(**words**):
>> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
>> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
>> **return** words
>> **#udf function**def* *obtain_ner_udf**(**words**):
>> **#if the tweet is empty return None*
>> *if** words == **""**:
>> **return* *None*
>> *#else: applying the NER model 

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-19 Thread Bjørn Jørgensen
Yes, there are some that have that issue.

Please open a new issue at
and they will help you.

tir. 19. apr. 2022 kl. 20:33 skrev Xavier Gervilla <>:

> Thank you for your advice, I had small knowledge of Spark NLP and I
> thought it was only possible to use with models that required training and
> therefore my project wasn’t the case. I'm trying now to build the project
> again with SparkNLP but when I try to load a pretrained model from
> JohnSnowLabs I get an error (*py4j.protocol.Py4JJavaError: An error
> occurred while calling
> z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.getDownloadSize.*
> ).
> This is the new basic code to develop the project again:
> *spark = sparknlp.start()*
> *pipelineName = 'analyze_sentiment'*
> *pipeline = PretrainedPipeline(pipelineName, 'en') #this is the line that
> generates the error*
> *rawTweets = spark.readStream.format('socket').option('host',
> 'localhost').option('port',9008).load()*
> *allTweets = rawTweets.selectExpr('CAST(value AS
> STRING)').withColumnRenamed('value', 'text').dropDuplicates('text')*
> *sentPred = pipeline.transform(allTweets)*
> *query =
> sentPred.writeStream.outputMode('complete').format('console').start()*
> *query.awaitTermination()*
> Spark version is 3.2.1 and SparkNLP version is 3.4.3, while Java version
> is 8. I've tried with a different model but the error is still the same, so
> what could be causing it?
> If this error is solved I think SparkNLP will be the solution I was
> looking for to reduce memory consumption so thank you again for suggesting
> it.
> El 18 abr 2022, a las 21:07, Bjørn Jørgensen 
> escribió:
> When did SpaCy have support for Spark?
> Try Spark NLP  it`s made for spark. They
> have a lot of notebooks at and
> they public user guides at
> man. 18. apr. 2022 kl. 16:17 skrev Sean Owen :
> It looks good, are you sure it even starts? the problem I see is that you
> send a copy of the model from the driver for every task. Try broadcasting
> the model instead. I'm not sure if that resolves it but would be a good
> practice.
> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
>> wrote:
> Hi Team,
> I'm developing a project that retrieves tweets on a 'host' app, streams
> them to Spark and with different operations with DataFrames obtains the
> Sentiment of the tweets and their entities applying a Sentiment model and a
> NER model respectively.
> The problem I've come across is that when applying the NER model, the RAM
> consumption increases until the program stops with a memory error because
> there's no memory left to execute. In addition, on SparkUI I've seen that
> there's only one executor running, the executor driver, but using htop on
> the terminal I see that the 8 cores of the instance are executing at 100%.
> The SparkSession is only configured to receive the tweets from the socket
> that connects with the second program that sends the tweets. The DataFrame
> goes through some processing to obtain other properties of the tweet like
> its sentiment (which causes no error even with less than 8GB of RAM) and
> then the NER is applied.
> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
> **"localhost"**).option(**"port"**,**9008**).load()
> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
> **#prior processing of the tweets**
> sentDF = other_processing(tweets)
> **#obtaining the column that contains the list of entities from a tweet**
> nerDF = ner_classification(sentDF)*
> This is the code of the functions related to obtaining the NER, the "main
> call" and the UDF function.
> *nerModel = spacy.load(**"en_core_web_sm"**)
> **#main call, applies the UDF function to every tweet from the "tweet" 
> column**def* *ner_classification**(**words**):
> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
> **return** words
> **#udf function**def* *obtain_ner_udf**(**words**):
> **#if the tweet is empty return None*
> *if** words == **""**:
> **return* *None*
> *#else: applying the NER model (Spacy en_core_web_sm)**
> entities = nerModel(words)
> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
> *return** [ word.text + **'_'** + word.label_ **for** word **in** 
> entities.ents ]*
> And lastly I map each entity with the sentiment from its tweet and obtain
> the 

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-19 Thread Jungtaek Lim
I have no context on ML, but your "streaming" query exposes the possibility
of memory issues.

>>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
>>> count(col) as count FROM df GROUP BY col"**
>>> finalDF = spark.sql(querySelect)
>>> query = 
>>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
Since this is a streaming query, grouped aggregation incurs state store,
and since you use the output mode as complete, state store will grow over
time which will dominate the memory in executors.

On Tue, Apr 19, 2022 at 4:07 AM Bjørn Jørgensen 

> When did SpaCy have support for Spark?
> Try Spark NLP  it`s made for spark. They
> have a lot of notebooks at and
> they public user guides at
> man. 18. apr. 2022 kl. 16:17 skrev Sean Owen :
>> It looks good, are you sure it even starts? the problem I see is that you
>> send a copy of the model from the driver for every task. Try broadcasting
>> the model instead. I'm not sure if that resolves it but would be a good
>> practice.
>> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
>>> wrote:
>>> Hi Team,
>>> I'm developing a project that retrieves tweets on a 'host' app, streams
>>> them to Spark and with different operations with DataFrames obtains the
>>> Sentiment of the tweets and their entities applying a Sentiment model and a
>>> NER model respectively.
>>> The problem I've come across is that when applying the NER model, the
>>> RAM consumption increases until the program stops with a memory error
>>> because there's no memory left to execute. In addition, on SparkUI I've
>>> seen that there's only one executor running, the executor driver, but using
>>> htop on the terminal I see that the 8 cores of the instance are executing
>>> at 100%.
>>> The SparkSession is only configured to receive the tweets from the
>>> socket that connects with the second program that sends the tweets. The
>>> DataFrame goes through some processing to obtain other properties of the
>>> tweet like its sentiment (which causes no error even with less than 8GB of
>>> RAM) and then the NER is applied.
>>> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
>>> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
>>> **"localhost"**).option(**"port"**,**9008**).load()
>>> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>>> **#prior processing of the tweets**
>>> sentDF = other_processing(tweets)
>>> **#obtaining the column that contains the list of entities from a tweet**
>>> nerDF = ner_classification(sentDF)*
>>> This is the code of the functions related to obtaining the NER, the
>>> "main call" and the UDF function.
>>> *nerModel = spacy.load(**"en_core_web_sm"**)
>>> **#main call, applies the UDF function to every tweet from the "tweet" 
>>> column**def* *ner_classification**(**words**):
>>> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
>>> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
>>> **return** words
>>> **#udf function**def* *obtain_ner_udf**(**words**):
>>> **#if the tweet is empty return None*
>>> *if** words == **""**:
>>> **return* *None*
>>> *#else: applying the NER model (Spacy en_core_web_sm)**
>>> entities = nerModel(words)
>>> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
>>> *return** [ word.text + **'_'** + word.label_ **for** word **in** 
>>> entities.ents ]*
>>> And lastly I map each entity with the sentiment from its tweet and
>>> obtain the average sentiment of the entity and the number of appearances.
>>> *flattenedNER =, explode(nerDF.nerlist))
>>> flattenedNER.registerTempTable(**"df"**)
>>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
>>> count(col) as count FROM df GROUP BY col"**
>>> finalDF = spark.sql(querySelect)
>>> query = 
>>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>>> The resulting DF is processed with a function that separates each column
>>> in a list and prints it.
>>> *def* *processBatch**(**df**,* *epoch_id**):**entities* *=* 
>>> *[**str**(**t**.**entity**)* *for* *t* *in* 
>>> *df**.**select**(**"entity"**).**collect**()]*
>>> *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* 

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-18 Thread Bjørn Jørgensen
When did SpaCy have support for Spark?

Try Spark NLP  it`s made for spark. They have
a lot of notebooks at and they
public user guides at

man. 18. apr. 2022 kl. 16:17 skrev Sean Owen :

> It looks good, are you sure it even starts? the problem I see is that you
> send a copy of the model from the driver for every task. Try broadcasting
> the model instead. I'm not sure if that resolves it but would be a good
> practice.
> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
>> wrote:
>> Hi Team,
>> I'm developing a project that retrieves tweets on a 'host' app, streams
>> them to Spark and with different operations with DataFrames obtains the
>> Sentiment of the tweets and their entities applying a Sentiment model and a
>> NER model respectively.
>> The problem I've come across is that when applying the NER model, the RAM
>> consumption increases until the program stops with a memory error because
>> there's no memory left to execute. In addition, on SparkUI I've seen that
>> there's only one executor running, the executor driver, but using htop on
>> the terminal I see that the 8 cores of the instance are executing at 100%.
>> The SparkSession is only configured to receive the tweets from the socket
>> that connects with the second program that sends the tweets. The DataFrame
>> goes through some processing to obtain other properties of the tweet like
>> its sentiment (which causes no error even with less than 8GB of RAM) and
>> then the NER is applied.
>> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
>> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
>> **"localhost"**).option(**"port"**,**9008**).load()
>> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>> **#prior processing of the tweets**
>> sentDF = other_processing(tweets)
>> **#obtaining the column that contains the list of entities from a tweet**
>> nerDF = ner_classification(sentDF)*
>> This is the code of the functions related to obtaining the NER, the "main
>> call" and the UDF function.
>> *nerModel = spacy.load(**"en_core_web_sm"**)
>> **#main call, applies the UDF function to every tweet from the "tweet" 
>> column**def* *ner_classification**(**words**):
>> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
>> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
>> **return** words
>> **#udf function**def* *obtain_ner_udf**(**words**):
>> **#if the tweet is empty return None*
>> *if** words == **""**:
>> **return* *None*
>> *#else: applying the NER model (Spacy en_core_web_sm)**
>> entities = nerModel(words)
>> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
>> *return** [ word.text + **'_'** + word.label_ **for** word **in** 
>> entities.ents ]*
>> And lastly I map each entity with the sentiment from its tweet and obtain
>> the average sentiment of the entity and the number of appearances.
>> *flattenedNER =, explode(nerDF.nerlist))
>> flattenedNER.registerTempTable(**"df"**)
>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
>> count(col) as count FROM df GROUP BY col"**
>> finalDF = spark.sql(querySelect)
>> query = 
>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>> The resulting DF is processed with a function that separates each column
>> in a list and prints it.
>> *def* *processBatch**(**df**,* *epoch_id**):**entities* *=* 
>> *[**str**(**t**.**entity**)* *for* *t* *in* 
>> *df**.**select**(**"entity"**).**collect**()]*
>> *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* 
>> *df**.**select**(**"sentiment"**).**collect**()]*
>> *counts* *=* *[**int**(**row**.**asDict**()[**'count'**])* *for* *row* 
>> *in* *df**.**select**(**"count"**).**collect**()]*
>> *print(**entities**,* *sentiments**,* *counts**)*
>> At first I tried with other NER models from Flair they have the same
>> effect, after printing the first batch memory use starts increasing until
>> it fails and stops the execution because of the memory error. When applying
>> a "simple" function instead of the NER model, such as *return
>> words.split()* on the UDF there's no such error so the data ingested
>> should not be what's causing the overload but the model.
>> Is there a way to prevent the excessive RAM consumption? Why is there
>> only the driver executor and no other executors are generated? How could I
>> prevent it from collapsing when applying the NER model?
>> Thanks in advance!

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-18 Thread Sean Owen
It looks good, are you sure it even starts? the problem I see is that you
send a copy of the model from the driver for every task. Try broadcasting
the model instead. I'm not sure if that resolves it but would be a good

On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla 

> Hi Team,
> I'm developing a project that retrieves tweets on a 'host' app, streams
> them to Spark and with different operations with DataFrames obtains the
> Sentiment of the tweets and their entities applying a Sentiment model and a
> NER model respectively.
> The problem I've come across is that when applying the NER model, the RAM
> consumption increases until the program stops with a memory error because
> there's no memory left to execute. In addition, on SparkUI I've seen that
> there's only one executor running, the executor driver, but using htop on
> the terminal I see that the 8 cores of the instance are executing at 100%.
> The SparkSession is only configured to receive the tweets from the socket
> that connects with the second program that sends the tweets. The DataFrame
> goes through some processing to obtain other properties of the tweet like
> its sentiment (which causes no error even with less than 8GB of RAM) and
> then the NER is applied.
> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
> **"localhost"**).option(**"port"**,**9008**).load()
> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
> **#prior processing of the tweets**
> sentDF = other_processing(tweets)
> **#obtaining the column that contains the list of entities from a tweet**
> nerDF = ner_classification(sentDF)*
> This is the code of the functions related to obtaining the NER, the "main
> call" and the UDF function.
> *nerModel = spacy.load(**"en_core_web_sm"**)
> **#main call, applies the UDF function to every tweet from the "tweet" 
> column**def* *ner_classification**(**words**):
> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
> **return** words
> **#udf function**def* *obtain_ner_udf**(**words**):
> **#if the tweet is empty return None*
> *if** words == **""**:
> **return* *None*
> *#else: applying the NER model (Spacy en_core_web_sm)**
> entities = nerModel(words)
> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
> *return** [ word.text + **'_'** + word.label_ **for** word **in** 
> entities.ents ]*
> And lastly I map each entity with the sentiment from its tweet and obtain
> the average sentiment of the entity and the number of appearances.
> *flattenedNER =, explode(nerDF.nerlist))
> flattenedNER.registerTempTable(**"df"**)
> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
> count(col) as count FROM df GROUP BY col"**
> finalDF = spark.sql(querySelect)
> query = 
> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
> The resulting DF is processed with a function that separates each column
> in a list and prints it.
> *def* *processBatch**(**df**,* *epoch_id**):**entities* *=* 
> *[**str**(**t**.**entity**)* *for* *t* *in* 
> *df**.**select**(**"entity"**).**collect**()]*
> *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* 
> *df**.**select**(**"sentiment"**).**collect**()]*
> *counts* *=* *[**int**(**row**.**asDict**()[**'count'**])* *for* *row* 
> *in* *df**.**select**(**"count"**).**collect**()]*
> *print(**entities**,* *sentiments**,* *counts**)*
> At first I tried with other NER models from Flair they have the same
> effect, after printing the first batch memory use starts increasing until
> it fails and stops the execution because of the memory error. When applying
> a "simple" function instead of the NER model, such as *return
> words.split()* on the UDF there's no such error so the data ingested
> should not be what's causing the overload but the model.
> Is there a way to prevent the excessive RAM consumption? Why is there only
> the driver executor and no other executors are generated? How could I
> prevent it from collapsing when applying the NER model?
> Thanks in advance!

[Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-18 Thread Xavier Gervilla

I'm developing a project that retrieves tweets on a 'host' app, streams them to 
Spark and 
with different operations with DataFrames obtains the Sentiment of the 
tweets and their entities applying a Sentiment model and a NER model 

The problem I've come across is that when applying the NER model, the 
RAM consumption increases until the program stops with a memory error 
because there's no memory left to execute. In addition, on SparkUI I've seen 
there's only one executor running, the executor driver, but using htop on the 
terminal I see that the 8 cores of the instance are executing at 100%.

The SparkSession is only configured to receive the tweets from the socket
 that connects with the second program that sends the tweets. The 
DataFrame goes through some processing to obtain other properties of the
 tweet like its sentiment (which causes no error even with less than 8GB
 of RAM) and then the NER is applied.

spark = SparkSession.builder.appName("TwitterStreamApp").getOrCreate()
rawTweets = spark.readStream.format("socket").option("host", 
tweets = rawTweets.selectExpr("CAST(value AS STRING)")

#prior processing of the tweets
sentDF = other_processing(tweets)

#obtaining the column that contains the list of entities from a tweet
nerDF = ner_classification(sentDF)

This is the code of the functions related to obtaining the NER, the "main call" 
and the UDF function.

nerModel = spacy.load("en_core_web_sm")

#main call, applies the UDF function to every tweet from the "tweet" column
def ner_classification(words):
ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
words = words.withColumn("nerlist", ner_list("tweet"))
return words

#udf function
def obtain_ner_udf(words):
#if the tweet is empty return None
if words == "":
return None
#else: applying the NER model (Spacy en_core_web_sm)
entities = nerModel(words)

#returns a list of the form ['entity1_label1', 'entity2_label2',...]
return [ word.text + '_' + word.label_ for word in entities.ents ]

And lastly I map each entity with the sentiment from its tweet and 
obtain the average sentiment of the entity and the number of 

flattenedNER =, explode(nerDF.nerlist))

querySelect = "SELECT col as entity, avg(sentiment) as sentiment, count(col) as 
count FROM df GROUP BY col"
finalDF = spark.sql(querySelect)

query = 

The resulting DF is processed with a function that separates each column in a 
list and prints it.

def processBatch(df, epoch_id):
entities = [str(t.entity) for t in"entity").collect()]

sentiments = [float(t.sentiment) for t in"sentiment").collect()]

counts = [int(row.asDict()['count']) for row in"count").collect()]

    print(entities, sentiments, counts) 

At first I tried with other NER models from Flair they have the same effect, 
after printing the first batch memory use starts increasing until it fails and 
stops the execution because of the memory error. When applying a "simple" 
function instead of the NER model, such as return words.split() on the UDF 
there's no such error so  the data ingested should not be what's causing the 
overload but the model.

Is there a way to prevent the excessive RAM consumption? Why is there only the 
driver executor and no other executors are generated? How could I prevent it 
collapsing when applying the NER model?

Thanks in advance!