Hi 

I am running into a strange error. I am trying to write a transformer that
takes in to columns and creates a LabeledPoint. I can not figure out why I
am getting 

AttributeError: 'DataFrame' object has no attribute Œ_get_object_id¹

I am using spark-1.5.1-bin-hadoop2.6

Any idea what I am doing wrong? Is this a bug with data frames?

Also I suspect the next problem I will run into is I do not think UDF¹s
support LabeledPoint?

Comments and suggestions are greatly appreciated

Andy




In [37]:
1
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
2
from pyspark.ml.util import keyword_only
3
 
4
from pyspark.sql.functions import udf
5
from pyspark.ml.pipeline import Transformer
6
 
7
from pyspark.sql.types import BinaryType, DataType, ByteType, StringType
8
from pyspark.mllib.linalg import SparseVector
9
from pyspark.mllib.regression import LabeledPoint
10
 
11
 
12
class LabledPointTransformer(Transformer, HasInputCol, HasOutputCol):
13
    @keyword_only
14
    def __init__(self, inputCol=None, outputCol=None, featureCol=None):
15
        super(LabledPointTransformer, self).__init__()
16
        self.featureCol = Param(self, "featureCol", "")
17
        self._setDefault(featureCol="feature")
18
        kwargs = self.__init__._input_kwargs
19
        self.setParams(**kwargs)
20
    
21
    @keyword_only
22
    def setParams(self, inputCol=None, outputCol=None, featureCol=None):
23
        kwargs = self.setParams._input_kwargs
24
        return self._set(**kwargs)
25
 
26
    def setFeatureCol(self, value):
27
        self._paramMap[self.featureCol] = value
28
        return self
29
 
30
    def getFeatureCol(self):
31
        return self.getOrDefault(self.featureCol)
32
    
33
    def _transform(self, dataset): # dataset is a data frame
34
        out_col = self.getOutputCol()
35
        labelCol = self.getInputCol()
36
        featureCol = self.getFeatureCol()
37
        
38
        def f(lf):
39
            return str(LabeledPoint(lf[labelCol], lf[featureCol]))
40
 
41
        t = StringType()
42
        #data = dataset[labelCol, featureCol]
43
        data = dataset.select(labelCol, featureCol)
44
        return dataset.withColumn(out_col, udf(f, t)(data))
45
 
46
lpData = sqlContext.createDataFrame([
47
    (0, SparseVector(3, [0, 1], [1.0, 2.0])),
48
    (1, SparseVector(3, [1, 2], [3.0, 1.0])),
49
    ], ["label", "features"])
50
    
51
lpData.show()
52
lpt = LabledPointTransformer(inputCol="label", outputCol="labeledPoint",
featureCol="features",)
53
tmp = lpt.transform(lpData)
54
tmp.collect()
+-----+-------------------+
|label|           features|
+-----+-------------------+
|    0|(3,[0,1],[1.0,2.0])|
|    1|(3,[1,2],[3.0,1.0])|
+-----+-------------------+

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-37-0ed0d16e8db3> in <module>()
     51 lpData.show()
     52 lpt = LabledPointTransformer(inputCol="label",
outputCol="labeledPoint", featureCol="features",)
---> 53 tmp = lpt.transform(lpData)
     54 tmp.collect()

/Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/ml/pipeline
.py in transform(self, dataset, params)
    105                 return self.copy(params,)._transform(dataset)
    106             else:
--> 107                 return self._transform(dataset)
    108         else:
    109             raise ValueError("Params must be either a param map but
got %s." % type(params))

<ipython-input-37-0ed0d16e8db3> in _transform(self, dataset)
     42         #data = dataset[labelCol, featureCol]
     43         data = dataset.select(labelCol, featureCol)
---> 44         return dataset.withColumn(out_col, udf(f, t)(data))
     45 
     46 lpData = sqlContext.createDataFrame([

/Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/functio
ns.py in __call__(self, *cols)
   1436     def __call__(self, *cols):
   1437         sc = SparkContext._active_spark_context
-> 1438         jc = self._judf.apply(_to_seq(sc, cols, _to_java_column))
   1439         return Column(jc)
   1440 

/Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/column.
py in _to_seq(sc, cols, converter)
     58     """
     59     if converter:
---> 60         cols = [converter(c) for c in cols]
     61     return sc._jvm.PythonUtils.toSeq(cols)
     62 

/Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/column.
py in <listcomp>(.0)
     58     """
     59     if converter:
---> 60         cols = [converter(c) for c in cols]
     61     return sc._jvm.PythonUtils.toSeq(cols)
     62 

/Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/column.
py in _to_java_column(col)
     46         jcol = col._jc
     47     else:
---> 48         jcol = _create_column_from_name(col)
     49     return jcol
     50 

/Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/column.
py in _create_column_from_name(name)
     39 def _create_column_from_name(name):
     40     sc = SparkContext._active_spark_context
---> 41     return sc._jvm.functions.col(name)
     42 
     43 

/Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-sr
c.zip/py4j/java_gateway.py in __call__(self, *args)
    527 
    528         args_command = ''.join(
--> 529                 [get_command_part(arg, self.pool) for arg in
new_args])
    530 
    531         command = CALL_COMMAND_NAME +\

/Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-sr
c.zip/py4j/java_gateway.py in <listcomp>(.0)
    527 
    528         args_command = ''.join(
--> 529                 [get_command_part(arg, self.pool) for arg in
new_args])
    530 
    531         command = CALL_COMMAND_NAME +\

/Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-sr
c.zip/py4j/protocol.py in get_command_part(parameter, python_proxy_pool)
    263             command_part += ';' + interface
    264     else:
--> 265         command_part = REFERENCE_TYPE + parameter._get_object_id()
    266 
    267     command_part += '\n'

/Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/datafra
me.py in __getattr__(self, name)
    747         if name not in self.columns:
    748             raise AttributeError(
--> 749                 "'%s' object has no attribute '%s'" %
(self.__class__.__name__, name))
    750         jc = self._jdf.apply(name)
    751         return Column(jc)

AttributeError: 'DataFrame' object has no attribute '_get_object_id'


Reply via email to