Hi
Team,https://stackoverflow.com/questions/71841814/is-there-a-way-to-prevent-excessive-ram-consumption-with-the-spark-configuration
I'm developing a project that retrieves tweets on a 'host' app, streams them to
Spark and
with different operations with DataFrames obtains the Sentiment of the
tweets and their entities applying a Sentiment model and a NER model
respectively.
The problem I've come across is that when applying the NER model, the
RAM consumption increases until the program stops with a memory error
because there's no memory left to execute. In addition, on SparkUI I've seen
that
there's only one executor running, the executor driver, but using htop on the
terminal I see that the 8 cores of the instance are executing at 100%.
The SparkSession is only configured to receive the tweets from the socket
that connects with the second program that sends the tweets. The
DataFrame goes through some processing to obtain other properties of the
tweet like its sentiment (which causes no error even with less than 8GB
of RAM) and then the NER is applied.
spark = SparkSession.builder.appName("TwitterStreamApp").getOrCreate()
rawTweets = spark.readStream.format("socket").option("host",
"localhost").option("port",9008).load()
tweets = rawTweets.selectExpr("CAST(value AS STRING)")
#prior processing of the tweets
sentDF = other_processing(tweets)
#obtaining the column that contains the list of entities from a tweet
nerDF = ner_classification(sentDF)
This is the code of the functions related to obtaining the NER, the "main call"
and the UDF function.
nerModel = spacy.load("en_core_web_sm")
#main call, applies the UDF function to every tweet from the "tweet" column
def ner_classification(words):
ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
words = words.withColumn("nerlist", ner_list("tweet"))
return words
#udf function
def obtain_ner_udf(words):
#if the tweet is empty return None
if words == "":
return None
#else: applying the NER model (Spacy en_core_web_sm)
entities = nerModel(words)
#returns a list of the form ['entity1_label1', 'entity2_label2',...]
return [ word.text + '_' + word.label_ for word in entities.ents ]
And lastly I map each entity with the sentiment from its tweet and
obtain the average sentiment of the entity and the number of
appearances.
flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
flattenedNER.registerTempTable("df")
querySelect = "SELECT col as entity, avg(sentiment) as sentiment, count(col) as
count FROM df GROUP BY col"
finalDF = spark.sql(querySelect)
query =
finalDF.writeStream.foreachBatch(processBatch).outputMode("complete").start()
The resulting DF is processed with a function that separates each column in a
list and prints it.
def processBatch(df, epoch_id):
entities = [str(t.entity) for t in df.select("entity").collect()]
sentiments = [float(t.sentiment) for t in df.select("sentiment").collect()]
counts = [int(row.asDict()['count']) for row in
df.select("count").collect()]
print(entities, sentiments, counts)
At first I tried with other NER models from Flair they have the same effect,
after printing the first batch memory use starts increasing until it fails and
stops the execution because of the memory error. When applying a "simple"
function instead of the NER model, such as return words.split() on the UDF
there's no such error so the data ingested should not be what's causing the
overload but the model.
Is there a way to prevent the excessive RAM consumption? Why is there only the
driver executor and no other executors are generated? How could I prevent it
from
collapsing when applying the NER model?
Thanks in advance!