Re: Unable to use scala function in pyspark
Thank you Jeff! I would certainly give it a try. Best, Rahul On 2021/09/26 22:49:03, Jeff Zhang wrote: > Hi kumar, > > You can try Zeppelin which support the udf sharing across languages > > http://zeppelin.apache.org/ > > > > > rahul kumar 于2021年9月27日周一 上午4:20写道: > > > I'm trying to use a function defined in scala jar in pyspark ( spark > > 3.0.2). > > > > --scala --- > > > > Object PythonUtil { > > > > def customedf(dataFrame: DataFrame, > > keyCol: String, > > table: String, > > outputSchema: StructType, > > database: String): DataFrame = { > > > > // some transformation of dataframe and convert as per the output schema > > types and fields. > > ... > > resultDF > > } > > > > //In jupyter notebook > > schema creation: > > alias = StructType([StructField("first_name", StringType(), > > False),StructField("last_name", StringType(), False)]) > > name = StructType([StructField("first_name", StringType(), > > False),StructField("aliases", ArrayType(alias), False)]) > > street_adress = StructType([StructField("street_name", StringType(), > > False),StructField("apt_number", IntegerType(), False)]) > > address = StructType([StructField("zip", LongType(), > > False),StructField("street", street_adress, False),StructField("city", > > StringType(), False)]) > > workHistory = StructType([StructField("company_name", StringType(), > > False),StructField("company_address", address, > > False),StructField("worked_from", StringType(), False)]) > > > > //passing this to scala function. > > outputschema= StructType([StructField("name", name, > > False),StructField("SSN", StringType(), False),StructField("home_address", > > ArrayType(address), False)]) > > > > ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"], > > ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]] > > customerIdsDF=spark.createDataFrame(ssns,["SSN"]) > > > > scala2_object= sc._jvm.com.mytest.spark.PythonUtil > > pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', > > 'table', outputschema, 'test'), spark._wrapped) > > > > Then I get an error that AttributeError: 'StructField' object has no > > attribute '_get_object_id' > > > > full stacktrace > > --- > > AttributeErrorTraceback (most recent call last) > > in > > 4 > > 5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil > > > 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, > > 'SSN', 'table',smallSchema, 'test'), spark._wrapped) > > > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > > in __call__(self, *args) > >1294 > >1295 def __call__(self, *args): > > -> 1296 args_command, temp_args = self._build_args(*args) > >1297 > >1298 command = proto.CALL_COMMAND_NAME +\ > > > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > > in _build_args(self, *args) > >1258 def _build_args(self, *args): > >1259 if self.converters is not None and len(self.converters) > > > 0: > > -> 1260 (new_args, temp_args) = self._get_args(args) > >1261 else: > >1262 new_args = args > > > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > > in _get_args(self, args) > >1245 for converter in self.gateway_client.converters: > >1246 if converter.can_convert(arg): > > -> 1247 temp_arg = converter.convert(arg, > > self.gateway_client) > >1248 temp_args.append(temp_arg) > >1249 new_args.append(temp_arg) > > > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py > > in convert(self, object, gateway_client) > > 509 java_list = ArrayList() > > 510 for element in object: > > --> 511 java_list.add(element) > > 512 return java_list > > 513 > > > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > > in __call__(self, *args) > >1294 > >1295 def __call__(self, *args): > > -> 1296 args_command, temp_args = self._build_args(*args) > >1297 > >1298 command = proto.CALL_COMMAND_NAME +\ > > > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > > in _build_args(self, *args) > >1264 > >1265 args_command = "".join( > > -> 1266 [get_command_part(arg, self.pool) for arg in new_args]) > >1267 > >1268 return args_command, temp_args > > > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > > in (.0) > >1264 > >1265 args_command = "".join( > > -> 1266 [get_command_part(arg, self.pool) for
Re: Unable to use scala function in pyspark
Thanks Sean, - I have tried executing it without wrapping into DataFrame constructor, but got the same error " AttributeError: 'StructField' object has no attribute '_get_object_id' - I have also tried using udf scala udf: class PythonUtil() extends UDF5[DataFrame, String,String, StructType, String, DataFrame] with Serializable { def call(t1: DataFrame, keyCol: String, set: String, outputSchema: StructType, namespace: String): DataFrame = { HelperFunctions.doTransformation(t1,keyCol,set,outputSchema,namespace) } } corresponding python section spark.udf.registerJavaFunction('com.mytest.spark.PythonUtil','com.mytest.spark.PythonUtil', pyspark.sql.dataframe.DataFrame()) Here in the return type, I'm not able to provide correct syntax of DataFrame. It appears dataframe requires data and sqlctxt ( unlike simple types like StringType() etc.. ) - Since I was not pass dataframe between scala and python, I tried working on view in scala, however didn't succeed there as well. Would be able to point me to some sample where folks are passing dataframe and schema to scala jar and getting dataframe back. Thanks, Rahul On 2021/09/26 23:21:36, Sean Owen wrote: > You can also call a Scala UDF from Python in Spark - this doesn't need > Zeppelin or relate to the front-end. > This may indeed be much easier as a proper UDF; depends on what this > function does. > However I think the issue may be that you're trying to wrap the resulting > DataFrame in a DataFrame or something. First inspect what you get back from > the invocation of the Scala method. > > > On Sun, Sep 26, 2021 at 5:50 PM Jeff Zhang wrote: > > > Hi kumar, > > > > You can try Zeppelin which support the udf sharing across languages > > > > http://zeppelin.apache.org/ > > > > > > > > > > rahul kumar 于2021年9月27日周一 上午4:20写道: > > > >> I'm trying to use a function defined in scala jar in pyspark ( spark > >> 3.0.2). > >> > >> --scala --- > >> > >> Object PythonUtil { > >> > >> def customedf(dataFrame: DataFrame, > >> keyCol: String, > >> table: String, > >> outputSchema: StructType, > >> database: String): DataFrame = { > >> > >> // some transformation of dataframe and convert as per the output schema > >> types and fields. > >> ... > >> resultDF > >> } > >> > >> //In jupyter notebook > >> schema creation: > >> alias = StructType([StructField("first_name", StringType(), > >> False),StructField("last_name", StringType(), False)]) > >> name = StructType([StructField("first_name", StringType(), > >> False),StructField("aliases", ArrayType(alias), False)]) > >> street_adress = StructType([StructField("street_name", StringType(), > >> False),StructField("apt_number", IntegerType(), False)]) > >> address = StructType([StructField("zip", LongType(), > >> False),StructField("street", street_adress, False),StructField("city", > >> StringType(), False)]) > >> workHistory = StructType([StructField("company_name", StringType(), > >> False),StructField("company_address", address, > >> False),StructField("worked_from", StringType(), False)]) > >> > >> //passing this to scala function. > >> outputschema= StructType([StructField("name", name, > >> False),StructField("SSN", StringType(), False),StructField("home_address", > >> ArrayType(address), False)]) > >> > >> ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"], > >> ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]] > >> customerIdsDF=spark.createDataFrame(ssns,["SSN"]) > >> > >> scala2_object= sc._jvm.com.mytest.spark.PythonUtil > >> pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', > >> 'table', outputschema, 'test'), spark._wrapped) > >> > >> Then I get an error that AttributeError: 'StructField' object has no > >> attribute '_get_object_id' > >> > >> full stacktrace > >> > >> --- > >> AttributeErrorTraceback (most recent call > >> last) > >> in > >> 4 > >> 5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil > >> > 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, > >> 'SSN', 'table',smallSchema, 'test'), spark._wrapped) > >> > >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > >> in __call__(self, *args) > >>1294 > >>1295 def __call__(self, *args): > >> -> 1296 args_command, temp_args = self._build_args(*args) > >>1297 > >>1298 command = proto.CALL_COMMAND_NAME +\ > >> > >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > >> in _build_args(self, *args) > >>1258 def _build_args(self, *args): > >>1259 if self.converters is not None and len(self.converters) > > >> 0: > >> -> 1260 (new_args, temp_args) = self._get_args(args) > >>1261 else: > >>1262
Re: Unable to use scala function in pyspark
You can also call a Scala UDF from Python in Spark - this doesn't need Zeppelin or relate to the front-end. This may indeed be much easier as a proper UDF; depends on what this function does. However I think the issue may be that you're trying to wrap the resulting DataFrame in a DataFrame or something. First inspect what you get back from the invocation of the Scala method. On Sun, Sep 26, 2021 at 5:50 PM Jeff Zhang wrote: > Hi kumar, > > You can try Zeppelin which support the udf sharing across languages > > http://zeppelin.apache.org/ > > > > > rahul kumar 于2021年9月27日周一 上午4:20写道: > >> I'm trying to use a function defined in scala jar in pyspark ( spark >> 3.0.2). >> >> --scala --- >> >> Object PythonUtil { >> >> def customedf(dataFrame: DataFrame, >> keyCol: String, >> table: String, >> outputSchema: StructType, >> database: String): DataFrame = { >> >> // some transformation of dataframe and convert as per the output schema >> types and fields. >> ... >> resultDF >> } >> >> //In jupyter notebook >> schema creation: >> alias = StructType([StructField("first_name", StringType(), >> False),StructField("last_name", StringType(), False)]) >> name = StructType([StructField("first_name", StringType(), >> False),StructField("aliases", ArrayType(alias), False)]) >> street_adress = StructType([StructField("street_name", StringType(), >> False),StructField("apt_number", IntegerType(), False)]) >> address = StructType([StructField("zip", LongType(), >> False),StructField("street", street_adress, False),StructField("city", >> StringType(), False)]) >> workHistory = StructType([StructField("company_name", StringType(), >> False),StructField("company_address", address, >> False),StructField("worked_from", StringType(), False)]) >> >> //passing this to scala function. >> outputschema= StructType([StructField("name", name, >> False),StructField("SSN", StringType(), False),StructField("home_address", >> ArrayType(address), False)]) >> >> ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"], >> ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]] >> customerIdsDF=spark.createDataFrame(ssns,["SSN"]) >> >> scala2_object= sc._jvm.com.mytest.spark.PythonUtil >> pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', >> 'table', outputschema, 'test'), spark._wrapped) >> >> Then I get an error that AttributeError: 'StructField' object has no >> attribute '_get_object_id' >> >> full stacktrace >> >> --- >> AttributeErrorTraceback (most recent call >> last) >> in >> 4 >> 5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil >> > 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, >> 'SSN', 'table',smallSchema, 'test'), spark._wrapped) >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in __call__(self, *args) >>1294 >>1295 def __call__(self, *args): >> -> 1296 args_command, temp_args = self._build_args(*args) >>1297 >>1298 command = proto.CALL_COMMAND_NAME +\ >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in _build_args(self, *args) >>1258 def _build_args(self, *args): >>1259 if self.converters is not None and len(self.converters) > >> 0: >> -> 1260 (new_args, temp_args) = self._get_args(args) >>1261 else: >>1262 new_args = args >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in _get_args(self, args) >>1245 for converter in self.gateway_client.converters: >>1246 if converter.can_convert(arg): >> -> 1247 temp_arg = converter.convert(arg, >> self.gateway_client) >>1248 temp_args.append(temp_arg) >>1249 new_args.append(temp_arg) >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py >> in convert(self, object, gateway_client) >> 509 java_list = ArrayList() >> 510 for element in object: >> --> 511 java_list.add(element) >> 512 return java_list >> 513 >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in __call__(self, *args) >>1294 >>1295 def __call__(self, *args): >> -> 1296 args_command, temp_args = self._build_args(*args) >>1297 >>1298 command = proto.CALL_COMMAND_NAME +\ >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in _build_args(self, *args) >>1264 >>1265 args_command = "".join( >> -> 1266 [get_command_part(arg, self.pool) for arg in >> new_args]) >>1267 >>1268 return args_command, temp_args >>
Re: Unable to use scala function in pyspark
You can first try it via docker http://zeppelin.apache.org/download.html#using-the-official-docker-image Jeff Zhang 于2021年9月27日周一 上午6:49写道: > Hi kumar, > > You can try Zeppelin which support the udf sharing across languages > > http://zeppelin.apache.org/ > > > > > rahul kumar 于2021年9月27日周一 上午4:20写道: > >> I'm trying to use a function defined in scala jar in pyspark ( spark >> 3.0.2). >> >> --scala --- >> >> Object PythonUtil { >> >> def customedf(dataFrame: DataFrame, >> keyCol: String, >> table: String, >> outputSchema: StructType, >> database: String): DataFrame = { >> >> // some transformation of dataframe and convert as per the output schema >> types and fields. >> ... >> resultDF >> } >> >> //In jupyter notebook >> schema creation: >> alias = StructType([StructField("first_name", StringType(), >> False),StructField("last_name", StringType(), False)]) >> name = StructType([StructField("first_name", StringType(), >> False),StructField("aliases", ArrayType(alias), False)]) >> street_adress = StructType([StructField("street_name", StringType(), >> False),StructField("apt_number", IntegerType(), False)]) >> address = StructType([StructField("zip", LongType(), >> False),StructField("street", street_adress, False),StructField("city", >> StringType(), False)]) >> workHistory = StructType([StructField("company_name", StringType(), >> False),StructField("company_address", address, >> False),StructField("worked_from", StringType(), False)]) >> >> //passing this to scala function. >> outputschema= StructType([StructField("name", name, >> False),StructField("SSN", StringType(), False),StructField("home_address", >> ArrayType(address), False)]) >> >> ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"], >> ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]] >> customerIdsDF=spark.createDataFrame(ssns,["SSN"]) >> >> scala2_object= sc._jvm.com.mytest.spark.PythonUtil >> pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', >> 'table', outputschema, 'test'), spark._wrapped) >> >> Then I get an error that AttributeError: 'StructField' object has no >> attribute '_get_object_id' >> >> full stacktrace >> >> --- >> AttributeErrorTraceback (most recent call >> last) >> in >> 4 >> 5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil >> > 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, >> 'SSN', 'table',smallSchema, 'test'), spark._wrapped) >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in __call__(self, *args) >>1294 >>1295 def __call__(self, *args): >> -> 1296 args_command, temp_args = self._build_args(*args) >>1297 >>1298 command = proto.CALL_COMMAND_NAME +\ >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in _build_args(self, *args) >>1258 def _build_args(self, *args): >>1259 if self.converters is not None and len(self.converters) > >> 0: >> -> 1260 (new_args, temp_args) = self._get_args(args) >>1261 else: >>1262 new_args = args >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in _get_args(self, args) >>1245 for converter in self.gateway_client.converters: >>1246 if converter.can_convert(arg): >> -> 1247 temp_arg = converter.convert(arg, >> self.gateway_client) >>1248 temp_args.append(temp_arg) >>1249 new_args.append(temp_arg) >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py >> in convert(self, object, gateway_client) >> 509 java_list = ArrayList() >> 510 for element in object: >> --> 511 java_list.add(element) >> 512 return java_list >> 513 >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in __call__(self, *args) >>1294 >>1295 def __call__(self, *args): >> -> 1296 args_command, temp_args = self._build_args(*args) >>1297 >>1298 command = proto.CALL_COMMAND_NAME +\ >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in _build_args(self, *args) >>1264 >>1265 args_command = "".join( >> -> 1266 [get_command_part(arg, self.pool) for arg in >> new_args]) >>1267 >>1268 return args_command, temp_args >> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in (.0) >>1264 >>1265 args_command = "".join( >> -> 1266 [get_command_part(arg, self.pool) for arg in >> new_args]) >>1267 >>1268 return args_command,
Re: Unable to use scala function in pyspark
Hi kumar, You can try Zeppelin which support the udf sharing across languages http://zeppelin.apache.org/ rahul kumar 于2021年9月27日周一 上午4:20写道: > I'm trying to use a function defined in scala jar in pyspark ( spark > 3.0.2). > > --scala --- > > Object PythonUtil { > > def customedf(dataFrame: DataFrame, > keyCol: String, > table: String, > outputSchema: StructType, > database: String): DataFrame = { > > // some transformation of dataframe and convert as per the output schema > types and fields. > ... > resultDF > } > > //In jupyter notebook > schema creation: > alias = StructType([StructField("first_name", StringType(), > False),StructField("last_name", StringType(), False)]) > name = StructType([StructField("first_name", StringType(), > False),StructField("aliases", ArrayType(alias), False)]) > street_adress = StructType([StructField("street_name", StringType(), > False),StructField("apt_number", IntegerType(), False)]) > address = StructType([StructField("zip", LongType(), > False),StructField("street", street_adress, False),StructField("city", > StringType(), False)]) > workHistory = StructType([StructField("company_name", StringType(), > False),StructField("company_address", address, > False),StructField("worked_from", StringType(), False)]) > > //passing this to scala function. > outputschema= StructType([StructField("name", name, > False),StructField("SSN", StringType(), False),StructField("home_address", > ArrayType(address), False)]) > > ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"], > ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]] > customerIdsDF=spark.createDataFrame(ssns,["SSN"]) > > scala2_object= sc._jvm.com.mytest.spark.PythonUtil > pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', > 'table', outputschema, 'test'), spark._wrapped) > > Then I get an error that AttributeError: 'StructField' object has no > attribute '_get_object_id' > > full stacktrace > --- > AttributeErrorTraceback (most recent call last) > in > 4 > 5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil > > 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, > 'SSN', 'table',smallSchema, 'test'), spark._wrapped) > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > in __call__(self, *args) >1294 >1295 def __call__(self, *args): > -> 1296 args_command, temp_args = self._build_args(*args) >1297 >1298 command = proto.CALL_COMMAND_NAME +\ > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > in _build_args(self, *args) >1258 def _build_args(self, *args): >1259 if self.converters is not None and len(self.converters) > > 0: > -> 1260 (new_args, temp_args) = self._get_args(args) >1261 else: >1262 new_args = args > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > in _get_args(self, args) >1245 for converter in self.gateway_client.converters: >1246 if converter.can_convert(arg): > -> 1247 temp_arg = converter.convert(arg, > self.gateway_client) >1248 temp_args.append(temp_arg) >1249 new_args.append(temp_arg) > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py > in convert(self, object, gateway_client) > 509 java_list = ArrayList() > 510 for element in object: > --> 511 java_list.add(element) > 512 return java_list > 513 > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > in __call__(self, *args) >1294 >1295 def __call__(self, *args): > -> 1296 args_command, temp_args = self._build_args(*args) >1297 >1298 command = proto.CALL_COMMAND_NAME +\ > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > in _build_args(self, *args) >1264 >1265 args_command = "".join( > -> 1266 [get_command_part(arg, self.pool) for arg in new_args]) >1267 >1268 return args_command, temp_args > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > in (.0) >1264 >1265 args_command = "".join( > -> 1266 [get_command_part(arg, self.pool) for arg in new_args]) >1267 >1268 return args_command, temp_args > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py > in get_command_part(parameter, python_proxy_pool) > 296 command_part += ";" + interface > 297 else: > --> 298 command_part = REFERENCE_TYPE +
Unable to use scala function in pyspark
I'm trying to use a function defined in scala jar in pyspark ( spark 3.0.2). --scala --- Object PythonUtil { def customedf(dataFrame: DataFrame, keyCol: String, table: String, outputSchema: StructType, database: String): DataFrame = { // some transformation of dataframe and convert as per the output schema types and fields. ... resultDF } //In jupyter notebook schema creation: alias = StructType([StructField("first_name", StringType(), False),StructField("last_name", StringType(), False)]) name = StructType([StructField("first_name", StringType(), False),StructField("aliases", ArrayType(alias), False)]) street_adress = StructType([StructField("street_name", StringType(), False),StructField("apt_number", IntegerType(), False)]) address = StructType([StructField("zip", LongType(), False),StructField("street", street_adress, False),StructField("city", StringType(), False)]) workHistory = StructType([StructField("company_name", StringType(), False),StructField("company_address", address, False),StructField("worked_from", StringType(), False)]) //passing this to scala function. outputschema= StructType([StructField("name", name, False),StructField("SSN", StringType(), False),StructField("home_address", ArrayType(address), False)]) ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"], ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]] customerIdsDF=spark.createDataFrame(ssns,["SSN"]) scala2_object= sc._jvm.com.mytest.spark.PythonUtil pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', 'table', outputschema, 'test'), spark._wrapped) Then I get an error that AttributeError: 'StructField' object has no attribute '_get_object_id' full stacktrace --- AttributeErrorTraceback (most recent call last) in 4 5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil > 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', 'table',smallSchema, 'test'), spark._wrapped) ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 1294 1295 def __call__(self, *args): -> 1296 args_command, temp_args = self._build_args(*args) 1297 1298 command = proto.CALL_COMMAND_NAME +\ ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in _build_args(self, *args) 1258 def _build_args(self, *args): 1259 if self.converters is not None and len(self.converters) > 0: -> 1260 (new_args, temp_args) = self._get_args(args) 1261 else: 1262 new_args = args ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in _get_args(self, args) 1245 for converter in self.gateway_client.converters: 1246 if converter.can_convert(arg): -> 1247 temp_arg = converter.convert(arg, self.gateway_client) 1248 temp_args.append(temp_arg) 1249 new_args.append(temp_arg) ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py in convert(self, object, gateway_client) 509 java_list = ArrayList() 510 for element in object: --> 511 java_list.add(element) 512 return java_list 513 ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 1294 1295 def __call__(self, *args): -> 1296 args_command, temp_args = self._build_args(*args) 1297 1298 command = proto.CALL_COMMAND_NAME +\ ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in _build_args(self, *args) 1264 1265 args_command = "".join( -> 1266 [get_command_part(arg, self.pool) for arg in new_args]) 1267 1268 return args_command, temp_args ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in (.0) 1264 1265 args_command = "".join( -> 1266 [get_command_part(arg, self.pool) for arg in new_args]) 1267 1268 return args_command, temp_args ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_command_part(parameter, python_proxy_pool) 296 command_part += ";" + interface 297 else: --> 298 command_part = REFERENCE_TYPE + parameter._get_object_id() 299 300 command_part += "\n" - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
[ANNOUNCE] Release Apache Kyuubi(Incubating) 1.3.0-incubating
Hello Spark Community, The Apache Kyuubi(Incubating) community is pleased to announce that Apache Kyuubi(Incubating) 1.3.0-incubating has been released! Apache Kyuubi(Incubating) is a distributed multi-tenant JDBC server for large-scale data processing and analytics, built on top of Apache Spark and designed to support more engines (i.e. Flink). Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC interface for end-users to manipulate large-scale data with pre-programmed and extensible Spark SQL engines. We are aiming to make Kyuubi an "out-of-the-box" tool for data warehouses and data lakes. This "out-of-the-box" model minimizes the barriers and costs for end-users to use Spark at the client side. At the server-side, Kyuubi server and engine's multi-tenant architecture provides the administrators a way to achieve computing resource isolation, data security, high availability, high client concurrency, etc. The full release notes and download links are available at: Release Notes: https://kyuubi.apache.org/release/1.3.0-incubating.html To learn more about Apache Kyuubi (Incubating), please see https://kyuubi.apache.org/ Kyuubi Resources: - Issue: https://github.com/apache/incubator-kyuubi/issues - Mailing list: d...@kyuubi.apache.org We would like to thank all contributors of the Kyuubi community and Incubating community who made this release possible! Thanks, On behalf of Apache Kyuubi(Incubating) community