Thank you Jeff! I would certainly give it a try. Best, Rahul
On 2021/09/26 22:49:03, Jeff Zhang <zjf...@gmail.com> wrote: > Hi kumar, > > You can try Zeppelin which support the udf sharing across languages > > http://zeppelin.apache.org/ > > > > > rahul kumar <rk20.stor...@gmail.com> 于2021年9月27日周一 上午4:20写道: > > > I'm trying to use a function defined in scala jar in pyspark ( spark > > 3.0.2). > > > > --scala --- > > > > Object PythonUtil { > > > > def customedf(dataFrame: DataFrame, > > keyCol: String, > > table: String, > > outputSchema: StructType, > > database: String): DataFrame = { > > > > // some transformation of dataframe and convert as per the output schema > > types and fields. > > ... > > resultDF > > } > > > > //In jupyter notebook > > schema creation: > > alias = StructType([StructField("first_name", StringType(), > > False),StructField("last_name", StringType(), False)]) > > name = StructType([StructField("first_name", StringType(), > > False),StructField("aliases", ArrayType(alias), False)]) > > street_adress = StructType([StructField("street_name", StringType(), > > False),StructField("apt_number", IntegerType(), False)]) > > address = StructType([StructField("zip", LongType(), > > False),StructField("street", street_adress, False),StructField("city", > > StringType(), False)]) > > workHistory = StructType([StructField("company_name", StringType(), > > False),StructField("company_address", address, > > False),StructField("worked_from", StringType(), False)]) > > > > //passing this to scala function. > > outputschema= StructType([StructField("name", name, > > False),StructField("SSN", StringType(), False),StructField("home_address", > > ArrayType(address), False)]) > > > > ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"], > > ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]] > > customerIdsDF=spark.createDataFrame(ssns,["SSN"]) > > > > scala2_object= sc._jvm.com.mytest.spark.PythonUtil > > pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', > > 'table', outputschema, 'test'), spark._wrapped) > > > > Then I get an error that AttributeError: 'StructField' object has no > > attribute '_get_object_id' > > > > full stacktrace > > --------------------------------------------------------------------------- > > AttributeError Traceback (most recent call last) > > <ipython-input-25-74a3b3e652e6> in <module> > > 4 > > 5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil > > ----> 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, > > 'SSN', 'table',smallSchema, 'test'), spark._wrapped) > > > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > > in __call__(self, *args) > > 1294 > > 1295 def __call__(self, *args): > > -> 1296 args_command, temp_args = self._build_args(*args) > > 1297 > > 1298 command = proto.CALL_COMMAND_NAME +\ > > > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > > in _build_args(self, *args) > > 1258 def _build_args(self, *args): > > 1259 if self.converters is not None and len(self.converters) > > > 0: > > -> 1260 (new_args, temp_args) = self._get_args(args) > > 1261 else: > > 1262 new_args = args > > > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > > in _get_args(self, args) > > 1245 for converter in self.gateway_client.converters: > > 1246 if converter.can_convert(arg): > > -> 1247 temp_arg = converter.convert(arg, > > self.gateway_client) > > 1248 temp_args.append(temp_arg) > > 1249 new_args.append(temp_arg) > > > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py > > in convert(self, object, gateway_client) > > 509 java_list = ArrayList() > > 510 for element in object: > > --> 511 java_list.add(element) > > 512 return java_list > > 513 > > > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > > in __call__(self, *args) > > 1294 > > 1295 def __call__(self, *args): > > -> 1296 args_command, temp_args = self._build_args(*args) > > 1297 > > 1298 command = proto.CALL_COMMAND_NAME +\ > > > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > > in _build_args(self, *args) > > 1264 > > 1265 args_command = "".join( > > -> 1266 [get_command_part(arg, self.pool) for arg in new_args]) > > 1267 > > 1268 return args_command, temp_args > > > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > > in <listcomp>(.0) > > 1264 > > 1265 args_command = "".join( > > -> 1266 [get_command_part(arg, self.pool) for arg in new_args]) > > 1267 > > 1268 return args_command, temp_args > > > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py > > in get_command_part(parameter, python_proxy_pool) > > 296 command_part += ";" + interface > > 297 else: > > --> 298 command_part = REFERENCE_TYPE + parameter._get_object_id() > > 299 > > 300 command_part += "\n" > > > > > > > > > > > > > > --------------------------------------------------------------------- > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > > -- > Best Regards > > Jeff Zhang > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org