You can also call a Scala UDF from Python in Spark - this doesn't need Zeppelin or relate to the front-end. This may indeed be much easier as a proper UDF; depends on what this function does. However I think the issue may be that you're trying to wrap the resulting DataFrame in a DataFrame or something. First inspect what you get back from the invocation of the Scala method.
On Sun, Sep 26, 2021 at 5:50 PM Jeff Zhang <zjf...@gmail.com> wrote: > Hi kumar, > > You can try Zeppelin which support the udf sharing across languages > > http://zeppelin.apache.org/ > > > > > rahul kumar <rk20.stor...@gmail.com> 于2021年9月27日周一 上午4:20写道: > >> I'm trying to use a function defined in scala jar in pyspark ( spark >> 3.0.2). >> >> --scala --- >> >> Object PythonUtil { >> >> def customedf(dataFrame: DataFrame, >> keyCol: String, >> table: String, >> outputSchema: StructType, >> database: String): DataFrame = { >> >> // some transformation of dataframe and convert as per the output schema >> types and fields. >> ... >> resultDF >> } >> >> //In jupyter notebook >> schema creation: >> alias = StructType([StructField("first_name", StringType(), >> False),StructField("last_name", StringType(), False)]) >> name = StructType([StructField("first_name", StringType(), >> False),StructField("aliases", ArrayType(alias), False)]) >> street_adress = StructType([StructField("street_name", StringType(), >> False),StructField("apt_number", IntegerType(), False)]) >> address = StructType([StructField("zip", LongType(), >> False),StructField("street", street_adress, False),StructField("city", >> StringType(), False)]) >> workHistory = StructType([StructField("company_name", StringType(), >> False),StructField("company_address", address, >> False),StructField("worked_from", StringType(), False)]) >> >> //passing this to scala function. >> outputschema= StructType([StructField("name", name, >> False),StructField("SSN", StringType(), False),StructField("home_address", >> ArrayType(address), False)]) >> >> ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"], >> ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]] >> customerIdsDF=spark.createDataFrame(ssns,["SSN"]) >> >> scala2_object= sc._jvm.com.mytest.spark.PythonUtil >> pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', >> 'table', outputschema, 'test'), spark._wrapped) >> >> Then I get an error that AttributeError: 'StructField' object has no >> attribute '_get_object_id' >> >> full stacktrace >> >> --------------------------------------------------------------------------- >> AttributeError Traceback (most recent call >> last) >> <ipython-input-25-74a3b3e652e6> in <module> >> 4 >> 5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil >> ----> 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, >> 'SSN', 'table',smallSchema, 'test'), spark._wrapped) >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in __call__(self, *args) >> 1294 >> 1295 def __call__(self, *args): >> -> 1296 args_command, temp_args = self._build_args(*args) >> 1297 >> 1298 command = proto.CALL_COMMAND_NAME +\ >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in _build_args(self, *args) >> 1258 def _build_args(self, *args): >> 1259 if self.converters is not None and len(self.converters) > >> 0: >> -> 1260 (new_args, temp_args) = self._get_args(args) >> 1261 else: >> 1262 new_args = args >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in _get_args(self, args) >> 1245 for converter in self.gateway_client.converters: >> 1246 if converter.can_convert(arg): >> -> 1247 temp_arg = converter.convert(arg, >> self.gateway_client) >> 1248 temp_args.append(temp_arg) >> 1249 new_args.append(temp_arg) >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py >> in convert(self, object, gateway_client) >> 509 java_list = ArrayList() >> 510 for element in object: >> --> 511 java_list.add(element) >> 512 return java_list >> 513 >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in __call__(self, *args) >> 1294 >> 1295 def __call__(self, *args): >> -> 1296 args_command, temp_args = self._build_args(*args) >> 1297 >> 1298 command = proto.CALL_COMMAND_NAME +\ >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in _build_args(self, *args) >> 1264 >> 1265 args_command = "".join( >> -> 1266 [get_command_part(arg, self.pool) for arg in >> new_args]) >> 1267 >> 1268 return args_command, temp_args >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in <listcomp>(.0) >> 1264 >> 1265 args_command = "".join( >> -> 1266 [get_command_part(arg, self.pool) for arg in >> new_args]) >> 1267 >> 1268 return args_command, temp_args >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py >> in get_command_part(parameter, python_proxy_pool) >> 296 command_part += ";" + interface >> 297 else: >> --> 298 command_part = REFERENCE_TYPE + parameter._get_object_id() >> 299 >> 300 command_part += "\n" >> >> >> >> >> >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> > > -- > Best Regards > > Jeff Zhang >