Hi,

I am having an issue that looks like a potentially serious bug with Spark
2.4.3 as it impacts data accuracy. I have searched in the Spark Jira and
mail lists as best I can and cannot find reference to anyone else having
this issue. I am not sure if this would be suitable for raising as a bug in
the Spark Jira so thought I should request help here.

The simplest summary of my suspected bug is:
Using pyspark with Spark 2.4.3 a MultiLayerPerceptron model givens
inconsistent outputs if a large amount of data is fed into it and at least
one of the model outputs is fed to a Python UDF.

After doing some debugging I haven't been able to get to the bottom of this
or recreate it 100% reliably (it does happen very frequently), but I have
narrowed the problem down somewhat and produced some stripped down code that
demonstrates the problem. Some observations I have made while doing this
are:
- I can recreate the problem with a very simple MultilayerPerceptron with no
hidden layers (just 14 features and 2 outputs), I also see it with a more
complex MultilayerPerceptron model.
- I cannot recreate the problem unless the model output is fed to a python
UDF, removing this leads to good outputs for the model and having it means
that model outputs are inconsistent (note that not just the Python UDF
outputs are inconsistent)
- I cannot recreate the problem on minuscule amounts of data or when my data
is partitioned heavily. 100,000 rows of input with 2 partitions sees the
issue happen most of the time.
- Some of the bad outputs I get could be explained if certain features were
zero when they came into the model (when they are not in my actual feature
data)
- I can recreate the problem on several different environments (with the
same setup) so I don;t think its an issue with my hardware.

My environment is CentOS 7.6 with Python 3.6.3 and Spark 2.4.3. I do not
have the native libraries for mllib installed. I'm aware later release of
Spark are available so please let me know if this is a problem (I would have
difficulty getting a later release installed on my environment, otherwise I
would test with that myself).

The below code sample triggers the problem for me the vast majority of the
time when run from a pyspark shell. This code generates a dataframe
containing 100,000 identical rows, transforms it with a MultiLayerPerceptron
model and feeds one of the model output columns to a simple Python UDF to
generate an additional column. The resulting dataframe has the distinct rows
selected and since all the inputs are identical I would expect to get 1 row
back, instead I get unique many rows with the number returned varying each
time I run the code. To run the code you will need the model files locally.
I have attached the model as a zip archive to this message (I hope),
unzipping this to /tmp should be all you need to do.

Please let me know if I have done anything wrong in this report. I haven't
posted to a mailing list like this before so am unsure on the format and
expectations when raising a message.

model.zip
<http://apache-spark-user-list.1001560.n3.nabble.com/file/t10909/model.zip>  


import pyspark
from pyspark.sql import functions as func

from math import log10

import pyspark
from pyspark.ml import *

from pyspark.ml.classification import MultilayerPerceptronClassifier,
MultilayerPerceptronClassificationModel
from pyspark.ml.feature import VectorAssembler

from pyspark.sql import Window
from pyspark.sql.types import FloatType

#############
sc.stop()

conf = pyspark.SparkConf().setMaster(
    'local'
).setAppName(
    'bug-Testing2'
).set(
    'spark.executor.memory', '1G'
).set(
    'spark.executor.cores', '1'
).set(
    'spark.executor.instances', '1'
).set(
    'spark.sql.shuffle.partitions', '1'
).set(
    'spark.default.parallelism', '2'
)

sc = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SparkSession(sc)
#############

data_array = []

for a1 in range(0,100000,1):
    data_array.append((
        1,1,1,1,1,1,1,1,1,1,1,1,1,1
    ))

df = spark.createDataFrame(
    data_array
)


mlp_model =
MultilayerPerceptronClassificationModel.load("file:///tmp/model")


features_vector = VectorAssembler(
        inputCols=[
            '_1',
            '_2',
            '_3',
            '_4',
            '_5',
            '_6',
            '_7',
            '_8',
            '_9',
            '_10',
            '_11',
            '_12',
            '_13',
            '_14'],
        outputCol="scaledFeatures"
).transform(df).select(
    [
        'scaledFeatures'
    ]
)

features_vector.cache()

def __return(
    vec,
    position):
    return float(vec[position])

__return_udf = func.udf(__return, FloatType())


transform_result = mlp_model.transform(features_vector)

final_results = transform_result.withColumn(
        'python_score',
        __return_udf(
            'probability',
            func.lit(1)
        )
    )

final_results.select('python_score','rawPrediction','probability').distinct().collect()



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to