I am using pyspark 1.6.1 and python3
Any idea what my bug is? Clearly the indices are being sorted?
Could it be the numDimensions = 713912692155621377 and my indices are longs
not ints?
import numpy as np
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg import VectorUDT
#sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# = 3 = size
# [0,1] int indices
#[1.0, 3.0] values
def toSparseVector(pojoList) :
indices = []
for pojo in pojoList :
indices.append(pojo.id)
sortedIndices = sorted(indices)
logical = np.ones(len(sortedIndices))
vec = Vectors.sparse(numDimensions, sortedIndices, logical)
return vec
newColName = "features"
myUDF = udf(toSparseVector, VectorUDT())
featuresDF = df.withColumn(newColName, myUDF(df["follows"]))
featuresDF.printSchema()
featuresDF.show()
root
|-- id: string (nullable = true)
|-- follows: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: long (nullable = false)
| | |-- screenName: string (nullable = false)
|-- features: vector (nullable = true)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-11-6f7c439ddd93> in <module>()
1 featuresDF.printSchema()
----> 2 featuresDF.show()
/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspa
rk/sql/dataframe.py in show(self, n, truncate)
255 +---+-----+
256 """
--> 257 print(self._jdf.showString(n, truncate))
258
259 def __repr__(self):
/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/p
y4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814
815 for temp_arg in temp_args:
/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspa
rk/sql/utils.py in deco(*a, **kw)
43 def deco(*a, **kw):
44 try:
---> 45 return f(*a, **kw)
46 except py4j.protocol.Py4JJavaError as e:
47 s = e.java_exception.toString()
/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/p
y4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client,
target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(
Py4JJavaError: An error occurred while calling o104.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0
(TID 212, localhost): org.apache.spark.api.python.PythonException: Traceback
(most recent call last):
File
"/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/p
yspark/worker.py", line 111, in main
process()
File
"/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/
pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File
"/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/
pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File
"/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/funct
ions.py", line 1563, in <lambda>
func = lambda _, it: map(lambda x: returnType.toInternal(f(*x)), it)
File "<ipython-input-10-4bc1e67834d6>", line 28, in toSparseVector
File
"/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/p
yspark/mllib/linalg/__init__.py", line 827, in sparse
return SparseVector(size, *args)
File
"/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/p
yspark/mllib/linalg/__init__.py", line 531, in __init__
raise TypeError("indices array must be sorted")
TypeError: indices array must be sorted