Thanks Sean,

- I have tried executing it without wrapping into DataFrame constructor, but 
got the same error "
AttributeError: 'StructField' object has no attribute '_get_object_id'

- I have also tried using udf
   scala udf: 
 class PythonUtil() extends UDF5[DataFrame, String,String, StructType, String, 
DataFrame]  with Serializable {
   def call(t1: DataFrame,  keyCol: String,
                    set: String,
                    outputSchema: StructType, namespace: String): DataFrame = {
     HelperFunctions.doTransformation(t1,keyCol,set,outputSchema,namespace)
  }
}

corresponding python section
 
spark.udf.registerJavaFunction('com.mytest.spark.PythonUtil','com.mytest.spark.PythonUtil',
 pyspark.sql.dataframe.DataFrame())

Here in the return type, I'm not able to provide correct syntax of DataFrame. 
It appears dataframe requires data and sqlctxt ( unlike simple types like 
StringType() etc.. )

- Since I was not pass dataframe between scala and python, I tried working on 
view in scala, however didn't succeed there as well.

Would be able to point me to some sample where folks are passing dataframe and 
schema to scala jar and getting dataframe back.

Thanks,
Rahul 

On 2021/09/26 23:21:36, Sean Owen <sro...@gmail.com> wrote: 
> You can also call a Scala UDF from Python in Spark - this doesn't need
> Zeppelin or relate to the front-end.
> This may indeed be much easier as a proper UDF; depends on what this
> function does.
> However I think the issue may be that you're trying to wrap the resulting
> DataFrame in a DataFrame or something. First inspect what you get back from
> the invocation of the Scala method.
> 
> 
> On Sun, Sep 26, 2021 at 5:50 PM Jeff Zhang <zjf...@gmail.com> wrote:
> 
> > Hi kumar,
> >
> > You can try Zeppelin which support the udf sharing across languages
> >
> > http://zeppelin.apache.org/
> >
> >
> >
> >
> > rahul kumar <rk20.stor...@gmail.com> 于2021年9月27日周一 上午4:20写道:
> >
> >> I'm trying to use a function defined in scala jar in pyspark ( spark
> >> 3.0.2).
> >>
> >> --scala ---
> >>
> >> Object PythonUtil {
> >>
> >> def customedf(dataFrame: DataFrame,
> >>                  keyCol: String,
> >>                  table: String,
> >>                  outputSchema: StructType,
> >>                  database: String): DataFrame = {
> >>
> >> // some transformation of dataframe and convert as per the output schema
> >> types and fields.
> >> ...
> >> resultDF
> >> }
> >>
> >> //In jupyter notebook
> >> schema creation:
> >> alias = StructType([StructField("first_name", StringType(),
> >> False),StructField("last_name", StringType(), False)])
> >> name = StructType([StructField("first_name", StringType(),
> >> False),StructField("aliases", ArrayType(alias), False)])
> >> street_adress = StructType([StructField("street_name", StringType(),
> >> False),StructField("apt_number", IntegerType(), False)])
> >> address = StructType([StructField("zip", LongType(),
> >> False),StructField("street", street_adress, False),StructField("city",
> >> StringType(), False)])
> >> workHistory = StructType([StructField("company_name", StringType(),
> >> False),StructField("company_address", address,
> >> False),StructField("worked_from", StringType(), False)])
> >>
> >> //passing this to scala function.
> >> outputschema= StructType([StructField("name", name,
> >> False),StructField("SSN", StringType(), False),StructField("home_address",
> >> ArrayType(address), False)])
> >>
> >> ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"],
> >> ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]]
> >> customerIdsDF=spark.createDataFrame(ssns,["SSN"])
> >>
> >> scala2_object= sc._jvm.com.mytest.spark.PythonUtil
> >> pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN',
> >> 'table', outputschema, 'test'), spark._wrapped)
> >>
> >> Then I get an error that AttributeError: 'StructField' object has no
> >> attribute '_get_object_id'
> >>
> >> full stacktrace
> >>
> >> ---------------------------------------------------------------------------
> >> AttributeError                            Traceback (most recent call
> >> last)
> >> <ipython-input-25-74a3b3e652e6> in <module>
> >>       4
> >>       5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil
> >> ----> 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf,
> >> 'SSN', 'table',smallSchema, 'test'), spark._wrapped)
> >>
> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> >> in __call__(self, *args)
> >>    1294
> >>    1295     def __call__(self, *args):
> >> -> 1296         args_command, temp_args = self._build_args(*args)
> >>    1297
> >>    1298         command = proto.CALL_COMMAND_NAME +\
> >>
> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> >> in _build_args(self, *args)
> >>    1258     def _build_args(self, *args):
> >>    1259         if self.converters is not None and len(self.converters) >
> >> 0:
> >> -> 1260             (new_args, temp_args) = self._get_args(args)
> >>    1261         else:
> >>    1262             new_args = args
> >>
> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> >> in _get_args(self, args)
> >>    1245                 for converter in self.gateway_client.converters:
> >>    1246                     if converter.can_convert(arg):
> >> -> 1247                         temp_arg = converter.convert(arg,
> >> self.gateway_client)
> >>    1248                         temp_args.append(temp_arg)
> >>    1249                         new_args.append(temp_arg)
> >>
> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py
> >> in convert(self, object, gateway_client)
> >>     509         java_list = ArrayList()
> >>     510         for element in object:
> >> --> 511             java_list.add(element)
> >>     512         return java_list
> >>     513
> >>
> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> >> in __call__(self, *args)
> >>    1294
> >>    1295     def __call__(self, *args):
> >> -> 1296         args_command, temp_args = self._build_args(*args)
> >>    1297
> >>    1298         command = proto.CALL_COMMAND_NAME +\
> >>
> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> >> in _build_args(self, *args)
> >>    1264
> >>    1265         args_command = "".join(
> >> -> 1266             [get_command_part(arg, self.pool) for arg in
> >> new_args])
> >>    1267
> >>    1268         return args_command, temp_args
> >>
> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> >> in <listcomp>(.0)
> >>    1264
> >>    1265         args_command = "".join(
> >> -> 1266             [get_command_part(arg, self.pool) for arg in
> >> new_args])
> >>    1267
> >>    1268         return args_command, temp_args
> >>
> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py
> >> in get_command_part(parameter, python_proxy_pool)
> >>     296             command_part += ";" + interface
> >>     297     else:
> >> --> 298         command_part = REFERENCE_TYPE + parameter._get_object_id()
> >>     299
> >>     300     command_part += "\n"
> >>
> >>
> >>
> >>
> >>
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >>
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
> 

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to