[ 
https://issues.apache.org/jira/browse/SPARK-22802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292527#comment-16292527
 ] 

Annamalai Venugopal commented on SPARK-22802:
---------------------------------------------

I am doing  a project with spark with the source code as:

import sys
import os
from nltk.stem import SnowballStemmer
from nltk.corpus import wordnet as wn
from os import listdir
from os.path import isfile, join
from pyspark.sql.types import *

from pyspark.ml.feature import RegexTokenizer
from pyspark.sql.functions import udf
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.linalg import SparseVector, VectorUDT

lmtzr = SnowballStemmer("english")

# Set the path for spark installation
# this is the path where you downloaded and uncompressed the Spark download
# Using forward slashes on windows, \\ should work too.
os.environ['SPARK_HOME'] = 
"C:/Users/avenugopal/Downloads/spark-2.2.0-bin-hadoop2.7/"
# Append the python dir to PYTHONPATH so that pyspark could be found
sys.path.append("C:/Users/avenugopal/Downloads/spark-2.2.0-bin-hadoop2.7/python/")
# Append the python/build to PYTHONPATH so that py4j could be found
sys.path.append('C:/Users/avenugopal/Downloads/spark-2.2.0-bin-hadoop2.7/python/build')
# try the import Spark Modules
try:
    from pyspark import SparkContext
    from pyspark import SparkConf
    from pyspark import SQLContext
    from pyspark.sql import SparkSession
    import pyspark.sql.functions as fun

except ImportError as e:
    print("Error importing Spark Modules", e)
    sys.exit(1)

# Run a word count example on the local machine:

# if len(sys.argv) != 2:
#     print ( sys.stderr, "Usage: wordcount <file>")
#     exit(-1)
conf = SparkConf()
conf.setMaster("local")
conf.setAppName("spark_wc").set("spark.driver.maxResultSize", 
"3g").set("spark.executor.memory", "3g")
sc = SparkContext(conf=conf)

sc.range(100)

my_path = "venv//reuters-extracted"
only_files = [f for f in listdir(my_path) if isfile(join(my_path, f))]

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("sentence", StringType(), True),
    StructField("file_name", StringType(), True)
])
rowList = []
index = 0
for file in enumerate(only_files):
    with open(os.path.join(my_path, file[1]), 'r') as my_file:
        data = my_file.read().replace('\n', '')
    rowList.append((index, data, file))
    index = index + 1
spark = SparkSession.builder.appName("TokenizerExample").getOrCreate()
# $example on$
# sentenceDataFrame = spark.createDataFrame([
#     (0, "Hi I heard about Spark"),
#     (1, "I wish Java could use case classes"),
#     (2, "Logistic,regression,models,are,neat")
# ], ["id", "sentence"])
sentenceDataFrame = spark.createDataFrame(sc.parallelize(rowList), schema)

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", 
pattern="[^a-zA-Z]")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

regexTokenized = regexTokenizer.transform(sentenceDataFrame)

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
filtered = remover.transform(regexTokenized)
filtered = filtered.drop("sentence").drop("words")

dataType = ArrayType(StringType(), False)
stemmingFn = udf(lambda words: stemmer(words), dataType)


def stemmer(words):
    stemmed_words = [lmtzr.stem(word) for word in words]
    return stemmed_words


stemmed = filtered.withColumn("stemmedWords", 
stemmingFn(fun.column("filtered")))

print(stemmed.rdd.getNumPartitions())
stemmed.select("stemmedWords").show(truncate=False)

cv = CountVectorizer(inputCol="stemmedWords", outputCol="features")

model = cv.fit(stemmed)
result = model.transform(stemmed)
result.select("features").show(truncate=False)
vocabulary = model.vocabulary
print(len(vocabulary))

idf = IDF(inputCol="features", outputCol="IDF_features")
idfModel = idf.fit(result)
rescaledData = idfModel.transform(result)

rescaledData.select("IDF_features").show(truncate=False)
print(rescaledData.rdd.getNumPartitions())


def filtering(vector):
    size = vector.size
    indices = vector.indices
    values = vector.values
    new_indices = []
    new_values = []
    for iterator, value in zip(indices, values):
        if value >= 0.8:
            new_indices.append(iterator)
            new_values.append(value)
    sparse_vector = SparseVector(size, new_indices, new_values)
    return sparse_vector


filterFn = udf(lambda vector: filtering(vector), VectorUDT())
filteredData = rescaledData.withColumn("filteredData", filterFn("IDF_features"))
filteredData = filteredData.drop("filtered")
filteredData.select("filteredData").show(truncate=False)


def token_extract(vector):
    indices = vector.indices
    tokens = []
    for iterator in indices:
        tokens.append(vocabulary[iterator])
    return tokens


token_extract_fn = udf(lambda vector: token_extract(vector), 
ArrayType(StringType()))

token_extracted_data = filteredData.withColumn("token_extracted_data", 
token_extract_fn("filteredData"))
token_extracted_data.select("token_extracted_data").show(truncate=False)

cv = CountVectorizer(inputCol="token_extracted_data", outputCol="newFeatures")

model = cv.fit(token_extracted_data)
result = model.transform(token_extracted_data)
result.select("newFeatures").show(truncate=False)
result = result.drop("filteredData")
vocabulary = model.vocabulary
print(len(vocabulary))


def hypernym_generation(token_array_a):
    token_array = token_array_a
    print(token_array)
    hypernym_dictionary = {}
    for token in token_array:
        synsets = wn.synsets(token)
        hypernym_set = []
        for synset in synsets:
            hypernyms = synset.hypernyms()
            if len(hypernyms) > 0:
                for hypernym in hypernyms:
                    hypernym_set.append(hypernym.lemma_names()[0])
    hypernym_dictionary[token] = hypernym_set
    return hypernym_dictionary


def convert(token_array):
    print(token_array)
    return hypernym_generation(token_array)


result.select("token_extracted_data").show(truncate=False)
hypernym_fn = udf(convert, MapType(StringType(), ArrayType(StringType(), True), 
True))
hypernym_extracted_data = result.withColumn("hypernym_extracted_data", 
hypernym_fn(fun.column("token_extracted_data")))
hypernym_extracted_data.select("hypernym_extracted_data").show(truncate=False)

sc.stop()



and at each stage after stage 8 :

it produces Traceback (most recent call last):
  File 
"C:\Users\avenugopal\AppData\Local\Programs\Python\Python36\lib\runpy.py", line 
193, in _run_module_as_main
    "__main__", mod_spec)
  File 
"C:\Users\avenugopal\AppData\Local\Programs\Python\Python36\lib\runpy.py", line 
85, in _run_code
    exec(code, run_globals)
  File 
"C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
 line 216, in <module>
  File 
"C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
 line 202, in main
  File 
"C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py",
 line 577, in read_int
EOFError



I have no idea on it

> Regarding max tax size
> ----------------------
>
>                 Key: SPARK-22802
>                 URL: https://issues.apache.org/jira/browse/SPARK-22802
>             Project: Spark
>          Issue Type: Question
>          Components: PySpark
>    Affects Versions: 2.2.1
>         Environment: Windows,Pycharm
>            Reporter: Annamalai Venugopal
>              Labels: windows
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> 17/12/15 17:15:55 WARN TaskSetManager: Stage 15 contains a task of very large 
> size (895 KB). The maximum recommended task size is 100 KB.
> Traceback (most recent call last):
>   File 
> "C:\Users\avenugopal\AppData\Local\Programs\Python\Python36\lib\runpy.py", 
> line 193, in _run_module_as_main
>     "__main__", mod_spec)
>   File 
> "C:\Users\avenugopal\AppData\Local\Programs\Python\Python36\lib\runpy.py", 
> line 85, in _run_code
>     exec(code, run_globals)
>   File 
> "C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
>  line 216, in <module>
>   File 
> "C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
>  line 202, in main
>   File 
> "C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py",
>  line 577, in read_int
> EOFError



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to