Re: Unable to use scala function in pyspark

2021-09-26 Thread rahul kumar
Thank you Jeff! I would certainly give it a try. 

Best,
Rahul

On 2021/09/26 22:49:03, Jeff Zhang  wrote: 
> Hi kumar,
> 
> You can try Zeppelin which support the udf sharing across languages
> 
> http://zeppelin.apache.org/
> 
> 
> 
> 
> rahul kumar  于2021年9月27日周一 上午4:20写道:
> 
> > I'm trying to use a function defined in scala jar in pyspark ( spark
> > 3.0.2).
> >
> > --scala ---
> >
> > Object PythonUtil {
> >
> > def customedf(dataFrame: DataFrame,
> >  keyCol: String,
> >  table: String,
> >  outputSchema: StructType,
> >  database: String): DataFrame = {
> >
> > // some transformation of dataframe and convert as per the output schema
> > types and fields.
> > ...
> > resultDF
> > }
> >
> > //In jupyter notebook
> > schema creation:
> > alias = StructType([StructField("first_name", StringType(),
> > False),StructField("last_name", StringType(), False)])
> > name = StructType([StructField("first_name", StringType(),
> > False),StructField("aliases", ArrayType(alias), False)])
> > street_adress = StructType([StructField("street_name", StringType(),
> > False),StructField("apt_number", IntegerType(), False)])
> > address = StructType([StructField("zip", LongType(),
> > False),StructField("street", street_adress, False),StructField("city",
> > StringType(), False)])
> > workHistory = StructType([StructField("company_name", StringType(),
> > False),StructField("company_address", address,
> > False),StructField("worked_from", StringType(), False)])
> >
> > //passing this to scala function.
> > outputschema= StructType([StructField("name", name,
> > False),StructField("SSN", StringType(), False),StructField("home_address",
> > ArrayType(address), False)])
> >
> > ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"],
> > ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]]
> > customerIdsDF=spark.createDataFrame(ssns,["SSN"])
> >
> > scala2_object= sc._jvm.com.mytest.spark.PythonUtil
> > pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN',
> > 'table', outputschema, 'test'), spark._wrapped)
> >
> > Then I get an error that AttributeError: 'StructField' object has no
> > attribute '_get_object_id'
> >
> > full stacktrace
> > ---
> > AttributeErrorTraceback (most recent call last)
> >  in 
> >   4
> >   5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil
> > > 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf,
> > 'SSN', 'table',smallSchema, 'test'), spark._wrapped)
> >
> > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> > in __call__(self, *args)
> >1294
> >1295 def __call__(self, *args):
> > -> 1296 args_command, temp_args = self._build_args(*args)
> >1297
> >1298 command = proto.CALL_COMMAND_NAME +\
> >
> > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> > in _build_args(self, *args)
> >1258 def _build_args(self, *args):
> >1259 if self.converters is not None and len(self.converters) >
> > 0:
> > -> 1260 (new_args, temp_args) = self._get_args(args)
> >1261 else:
> >1262 new_args = args
> >
> > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> > in _get_args(self, args)
> >1245 for converter in self.gateway_client.converters:
> >1246 if converter.can_convert(arg):
> > -> 1247 temp_arg = converter.convert(arg,
> > self.gateway_client)
> >1248 temp_args.append(temp_arg)
> >1249 new_args.append(temp_arg)
> >
> > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py
> > in convert(self, object, gateway_client)
> > 509 java_list = ArrayList()
> > 510 for element in object:
> > --> 511 java_list.add(element)
> > 512 return java_list
> > 513
> >
> > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> > in __call__(self, *args)
> >1294
> >1295 def __call__(self, *args):
> > -> 1296 args_command, temp_args = self._build_args(*args)
> >1297
> >1298 command = proto.CALL_COMMAND_NAME +\
> >
> > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> > in _build_args(self, *args)
> >1264
> >1265 args_command = "".join(
> > -> 1266 [get_command_part(arg, self.pool) for arg in new_args])
> >1267
> >1268 return args_command, temp_args
> >
> > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> > in (.0)
> >1264
> >1265 args_command = "".join(
> > -> 1266 [get_command_part(arg, self.pool) for 

Re: Unable to use scala function in pyspark

2021-09-26 Thread rahul kumar
Thanks Sean,

- I have tried executing it without wrapping into DataFrame constructor, but 
got the same error "
AttributeError: 'StructField' object has no attribute '_get_object_id'

- I have also tried using udf
   scala udf: 
 class PythonUtil() extends UDF5[DataFrame, String,String, StructType, String, 
DataFrame]  with Serializable {
   def call(t1: DataFrame,  keyCol: String,
set: String,
outputSchema: StructType, namespace: String): DataFrame = {
 HelperFunctions.doTransformation(t1,keyCol,set,outputSchema,namespace)
  }
}

corresponding python section
 
spark.udf.registerJavaFunction('com.mytest.spark.PythonUtil','com.mytest.spark.PythonUtil',
 pyspark.sql.dataframe.DataFrame())

Here in the return type, I'm not able to provide correct syntax of DataFrame. 
It appears dataframe requires data and sqlctxt ( unlike simple types like 
StringType() etc.. )

- Since I was not pass dataframe between scala and python, I tried working on 
view in scala, however didn't succeed there as well.

Would be able to point me to some sample where folks are passing dataframe and 
schema to scala jar and getting dataframe back.

Thanks,
Rahul 

On 2021/09/26 23:21:36, Sean Owen  wrote: 
> You can also call a Scala UDF from Python in Spark - this doesn't need
> Zeppelin or relate to the front-end.
> This may indeed be much easier as a proper UDF; depends on what this
> function does.
> However I think the issue may be that you're trying to wrap the resulting
> DataFrame in a DataFrame or something. First inspect what you get back from
> the invocation of the Scala method.
> 
> 
> On Sun, Sep 26, 2021 at 5:50 PM Jeff Zhang  wrote:
> 
> > Hi kumar,
> >
> > You can try Zeppelin which support the udf sharing across languages
> >
> > http://zeppelin.apache.org/
> >
> >
> >
> >
> > rahul kumar  于2021年9月27日周一 上午4:20写道:
> >
> >> I'm trying to use a function defined in scala jar in pyspark ( spark
> >> 3.0.2).
> >>
> >> --scala ---
> >>
> >> Object PythonUtil {
> >>
> >> def customedf(dataFrame: DataFrame,
> >>  keyCol: String,
> >>  table: String,
> >>  outputSchema: StructType,
> >>  database: String): DataFrame = {
> >>
> >> // some transformation of dataframe and convert as per the output schema
> >> types and fields.
> >> ...
> >> resultDF
> >> }
> >>
> >> //In jupyter notebook
> >> schema creation:
> >> alias = StructType([StructField("first_name", StringType(),
> >> False),StructField("last_name", StringType(), False)])
> >> name = StructType([StructField("first_name", StringType(),
> >> False),StructField("aliases", ArrayType(alias), False)])
> >> street_adress = StructType([StructField("street_name", StringType(),
> >> False),StructField("apt_number", IntegerType(), False)])
> >> address = StructType([StructField("zip", LongType(),
> >> False),StructField("street", street_adress, False),StructField("city",
> >> StringType(), False)])
> >> workHistory = StructType([StructField("company_name", StringType(),
> >> False),StructField("company_address", address,
> >> False),StructField("worked_from", StringType(), False)])
> >>
> >> //passing this to scala function.
> >> outputschema= StructType([StructField("name", name,
> >> False),StructField("SSN", StringType(), False),StructField("home_address",
> >> ArrayType(address), False)])
> >>
> >> ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"],
> >> ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]]
> >> customerIdsDF=spark.createDataFrame(ssns,["SSN"])
> >>
> >> scala2_object= sc._jvm.com.mytest.spark.PythonUtil
> >> pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN',
> >> 'table', outputschema, 'test'), spark._wrapped)
> >>
> >> Then I get an error that AttributeError: 'StructField' object has no
> >> attribute '_get_object_id'
> >>
> >> full stacktrace
> >>
> >> ---
> >> AttributeErrorTraceback (most recent call
> >> last)
> >>  in 
> >>   4
> >>   5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil
> >> > 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf,
> >> 'SSN', 'table',smallSchema, 'test'), spark._wrapped)
> >>
> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> >> in __call__(self, *args)
> >>1294
> >>1295 def __call__(self, *args):
> >> -> 1296 args_command, temp_args = self._build_args(*args)
> >>1297
> >>1298 command = proto.CALL_COMMAND_NAME +\
> >>
> >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> >> in _build_args(self, *args)
> >>1258 def _build_args(self, *args):
> >>1259 if self.converters is not None and len(self.converters) >
> >> 0:
> >> -> 1260 (new_args, temp_args) = self._get_args(args)
> >>1261 else:
> >>1262 

Re: Unable to use scala function in pyspark

2021-09-26 Thread Sean Owen
You can also call a Scala UDF from Python in Spark - this doesn't need
Zeppelin or relate to the front-end.
This may indeed be much easier as a proper UDF; depends on what this
function does.
However I think the issue may be that you're trying to wrap the resulting
DataFrame in a DataFrame or something. First inspect what you get back from
the invocation of the Scala method.


On Sun, Sep 26, 2021 at 5:50 PM Jeff Zhang  wrote:

> Hi kumar,
>
> You can try Zeppelin which support the udf sharing across languages
>
> http://zeppelin.apache.org/
>
>
>
>
> rahul kumar  于2021年9月27日周一 上午4:20写道:
>
>> I'm trying to use a function defined in scala jar in pyspark ( spark
>> 3.0.2).
>>
>> --scala ---
>>
>> Object PythonUtil {
>>
>> def customedf(dataFrame: DataFrame,
>>  keyCol: String,
>>  table: String,
>>  outputSchema: StructType,
>>  database: String): DataFrame = {
>>
>> // some transformation of dataframe and convert as per the output schema
>> types and fields.
>> ...
>> resultDF
>> }
>>
>> //In jupyter notebook
>> schema creation:
>> alias = StructType([StructField("first_name", StringType(),
>> False),StructField("last_name", StringType(), False)])
>> name = StructType([StructField("first_name", StringType(),
>> False),StructField("aliases", ArrayType(alias), False)])
>> street_adress = StructType([StructField("street_name", StringType(),
>> False),StructField("apt_number", IntegerType(), False)])
>> address = StructType([StructField("zip", LongType(),
>> False),StructField("street", street_adress, False),StructField("city",
>> StringType(), False)])
>> workHistory = StructType([StructField("company_name", StringType(),
>> False),StructField("company_address", address,
>> False),StructField("worked_from", StringType(), False)])
>>
>> //passing this to scala function.
>> outputschema= StructType([StructField("name", name,
>> False),StructField("SSN", StringType(), False),StructField("home_address",
>> ArrayType(address), False)])
>>
>> ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"],
>> ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]]
>> customerIdsDF=spark.createDataFrame(ssns,["SSN"])
>>
>> scala2_object= sc._jvm.com.mytest.spark.PythonUtil
>> pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN',
>> 'table', outputschema, 'test'), spark._wrapped)
>>
>> Then I get an error that AttributeError: 'StructField' object has no
>> attribute '_get_object_id'
>>
>> full stacktrace
>>
>> ---
>> AttributeErrorTraceback (most recent call
>> last)
>>  in 
>>   4
>>   5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil
>> > 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf,
>> 'SSN', 'table',smallSchema, 'test'), spark._wrapped)
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in __call__(self, *args)
>>1294
>>1295 def __call__(self, *args):
>> -> 1296 args_command, temp_args = self._build_args(*args)
>>1297
>>1298 command = proto.CALL_COMMAND_NAME +\
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in _build_args(self, *args)
>>1258 def _build_args(self, *args):
>>1259 if self.converters is not None and len(self.converters) >
>> 0:
>> -> 1260 (new_args, temp_args) = self._get_args(args)
>>1261 else:
>>1262 new_args = args
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in _get_args(self, args)
>>1245 for converter in self.gateway_client.converters:
>>1246 if converter.can_convert(arg):
>> -> 1247 temp_arg = converter.convert(arg,
>> self.gateway_client)
>>1248 temp_args.append(temp_arg)
>>1249 new_args.append(temp_arg)
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py
>> in convert(self, object, gateway_client)
>> 509 java_list = ArrayList()
>> 510 for element in object:
>> --> 511 java_list.add(element)
>> 512 return java_list
>> 513
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in __call__(self, *args)
>>1294
>>1295 def __call__(self, *args):
>> -> 1296 args_command, temp_args = self._build_args(*args)
>>1297
>>1298 command = proto.CALL_COMMAND_NAME +\
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in _build_args(self, *args)
>>1264
>>1265 args_command = "".join(
>> -> 1266 [get_command_part(arg, self.pool) for arg in
>> new_args])
>>1267
>>1268 return args_command, temp_args
>>

Re: Unable to use scala function in pyspark

2021-09-26 Thread Jeff Zhang
You can first try it via docker
http://zeppelin.apache.org/download.html#using-the-official-docker-image


Jeff Zhang  于2021年9月27日周一 上午6:49写道:

> Hi kumar,
>
> You can try Zeppelin which support the udf sharing across languages
>
> http://zeppelin.apache.org/
>
>
>
>
> rahul kumar  于2021年9月27日周一 上午4:20写道:
>
>> I'm trying to use a function defined in scala jar in pyspark ( spark
>> 3.0.2).
>>
>> --scala ---
>>
>> Object PythonUtil {
>>
>> def customedf(dataFrame: DataFrame,
>>  keyCol: String,
>>  table: String,
>>  outputSchema: StructType,
>>  database: String): DataFrame = {
>>
>> // some transformation of dataframe and convert as per the output schema
>> types and fields.
>> ...
>> resultDF
>> }
>>
>> //In jupyter notebook
>> schema creation:
>> alias = StructType([StructField("first_name", StringType(),
>> False),StructField("last_name", StringType(), False)])
>> name = StructType([StructField("first_name", StringType(),
>> False),StructField("aliases", ArrayType(alias), False)])
>> street_adress = StructType([StructField("street_name", StringType(),
>> False),StructField("apt_number", IntegerType(), False)])
>> address = StructType([StructField("zip", LongType(),
>> False),StructField("street", street_adress, False),StructField("city",
>> StringType(), False)])
>> workHistory = StructType([StructField("company_name", StringType(),
>> False),StructField("company_address", address,
>> False),StructField("worked_from", StringType(), False)])
>>
>> //passing this to scala function.
>> outputschema= StructType([StructField("name", name,
>> False),StructField("SSN", StringType(), False),StructField("home_address",
>> ArrayType(address), False)])
>>
>> ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"],
>> ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]]
>> customerIdsDF=spark.createDataFrame(ssns,["SSN"])
>>
>> scala2_object= sc._jvm.com.mytest.spark.PythonUtil
>> pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN',
>> 'table', outputschema, 'test'), spark._wrapped)
>>
>> Then I get an error that AttributeError: 'StructField' object has no
>> attribute '_get_object_id'
>>
>> full stacktrace
>>
>> ---
>> AttributeErrorTraceback (most recent call
>> last)
>>  in 
>>   4
>>   5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil
>> > 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf,
>> 'SSN', 'table',smallSchema, 'test'), spark._wrapped)
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in __call__(self, *args)
>>1294
>>1295 def __call__(self, *args):
>> -> 1296 args_command, temp_args = self._build_args(*args)
>>1297
>>1298 command = proto.CALL_COMMAND_NAME +\
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in _build_args(self, *args)
>>1258 def _build_args(self, *args):
>>1259 if self.converters is not None and len(self.converters) >
>> 0:
>> -> 1260 (new_args, temp_args) = self._get_args(args)
>>1261 else:
>>1262 new_args = args
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in _get_args(self, args)
>>1245 for converter in self.gateway_client.converters:
>>1246 if converter.can_convert(arg):
>> -> 1247 temp_arg = converter.convert(arg,
>> self.gateway_client)
>>1248 temp_args.append(temp_arg)
>>1249 new_args.append(temp_arg)
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py
>> in convert(self, object, gateway_client)
>> 509 java_list = ArrayList()
>> 510 for element in object:
>> --> 511 java_list.add(element)
>> 512 return java_list
>> 513
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in __call__(self, *args)
>>1294
>>1295 def __call__(self, *args):
>> -> 1296 args_command, temp_args = self._build_args(*args)
>>1297
>>1298 command = proto.CALL_COMMAND_NAME +\
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in _build_args(self, *args)
>>1264
>>1265 args_command = "".join(
>> -> 1266 [get_command_part(arg, self.pool) for arg in
>> new_args])
>>1267
>>1268 return args_command, temp_args
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in (.0)
>>1264
>>1265 args_command = "".join(
>> -> 1266 [get_command_part(arg, self.pool) for arg in
>> new_args])
>>1267
>>1268 return args_command, 

Re: Unable to use scala function in pyspark

2021-09-26 Thread Jeff Zhang
Hi kumar,

You can try Zeppelin which support the udf sharing across languages

http://zeppelin.apache.org/




rahul kumar  于2021年9月27日周一 上午4:20写道:

> I'm trying to use a function defined in scala jar in pyspark ( spark
> 3.0.2).
>
> --scala ---
>
> Object PythonUtil {
>
> def customedf(dataFrame: DataFrame,
>  keyCol: String,
>  table: String,
>  outputSchema: StructType,
>  database: String): DataFrame = {
>
> // some transformation of dataframe and convert as per the output schema
> types and fields.
> ...
> resultDF
> }
>
> //In jupyter notebook
> schema creation:
> alias = StructType([StructField("first_name", StringType(),
> False),StructField("last_name", StringType(), False)])
> name = StructType([StructField("first_name", StringType(),
> False),StructField("aliases", ArrayType(alias), False)])
> street_adress = StructType([StructField("street_name", StringType(),
> False),StructField("apt_number", IntegerType(), False)])
> address = StructType([StructField("zip", LongType(),
> False),StructField("street", street_adress, False),StructField("city",
> StringType(), False)])
> workHistory = StructType([StructField("company_name", StringType(),
> False),StructField("company_address", address,
> False),StructField("worked_from", StringType(), False)])
>
> //passing this to scala function.
> outputschema= StructType([StructField("name", name,
> False),StructField("SSN", StringType(), False),StructField("home_address",
> ArrayType(address), False)])
>
> ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"],
> ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]]
> customerIdsDF=spark.createDataFrame(ssns,["SSN"])
>
> scala2_object= sc._jvm.com.mytest.spark.PythonUtil
> pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN',
> 'table', outputschema, 'test'), spark._wrapped)
>
> Then I get an error that AttributeError: 'StructField' object has no
> attribute '_get_object_id'
>
> full stacktrace
> ---
> AttributeErrorTraceback (most recent call last)
>  in 
>   4
>   5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil
> > 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf,
> 'SSN', 'table',smallSchema, 'test'), spark._wrapped)
>
> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> in __call__(self, *args)
>1294
>1295 def __call__(self, *args):
> -> 1296 args_command, temp_args = self._build_args(*args)
>1297
>1298 command = proto.CALL_COMMAND_NAME +\
>
> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> in _build_args(self, *args)
>1258 def _build_args(self, *args):
>1259 if self.converters is not None and len(self.converters) >
> 0:
> -> 1260 (new_args, temp_args) = self._get_args(args)
>1261 else:
>1262 new_args = args
>
> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> in _get_args(self, args)
>1245 for converter in self.gateway_client.converters:
>1246 if converter.can_convert(arg):
> -> 1247 temp_arg = converter.convert(arg,
> self.gateway_client)
>1248 temp_args.append(temp_arg)
>1249 new_args.append(temp_arg)
>
> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py
> in convert(self, object, gateway_client)
> 509 java_list = ArrayList()
> 510 for element in object:
> --> 511 java_list.add(element)
> 512 return java_list
> 513
>
> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> in __call__(self, *args)
>1294
>1295 def __call__(self, *args):
> -> 1296 args_command, temp_args = self._build_args(*args)
>1297
>1298 command = proto.CALL_COMMAND_NAME +\
>
> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> in _build_args(self, *args)
>1264
>1265 args_command = "".join(
> -> 1266 [get_command_part(arg, self.pool) for arg in new_args])
>1267
>1268 return args_command, temp_args
>
> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> in (.0)
>1264
>1265 args_command = "".join(
> -> 1266 [get_command_part(arg, self.pool) for arg in new_args])
>1267
>1268 return args_command, temp_args
>
> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py
> in get_command_part(parameter, python_proxy_pool)
> 296 command_part += ";" + interface
> 297 else:
> --> 298 command_part = REFERENCE_TYPE + 

Unable to use scala function in pyspark

2021-09-26 Thread rahul kumar
I'm trying to use a function defined in scala jar in pyspark ( spark 3.0.2).  

--scala ---

Object PythonUtil {

def customedf(dataFrame: DataFrame,
 keyCol: String,
 table: String,
 outputSchema: StructType,
 database: String): DataFrame = {

// some transformation of dataframe and convert as per the output schema types 
and fields.
...
resultDF
}

//In jupyter notebook
schema creation:
alias = StructType([StructField("first_name", StringType(), 
False),StructField("last_name", StringType(), False)])
name = StructType([StructField("first_name", StringType(), 
False),StructField("aliases", ArrayType(alias), False)])
street_adress = StructType([StructField("street_name", StringType(), 
False),StructField("apt_number", IntegerType(), False)])
address = StructType([StructField("zip", LongType(), 
False),StructField("street", street_adress, False),StructField("city", 
StringType(), False)])
workHistory = StructType([StructField("company_name", StringType(), 
False),StructField("company_address", address, 
False),StructField("worked_from", StringType(), False)])

//passing this to scala function.
outputschema= StructType([StructField("name", name, False),StructField("SSN", 
StringType(), False),StructField("home_address", ArrayType(address), False)])

ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"], ["525-31-0299"], 
["456-45-2200"], ["200-71-7765"]]
customerIdsDF=spark.createDataFrame(ssns,["SSN"])

scala2_object= sc._jvm.com.mytest.spark.PythonUtil
pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', 
'table', outputschema, 'test'), spark._wrapped)

Then I get an error that AttributeError: 'StructField' object has no attribute 
'_get_object_id'

full stacktrace
---
AttributeErrorTraceback (most recent call last)
 in 
  4 
  5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil
> 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', 
'table',smallSchema, 'test'), spark._wrapped)

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
   1294 
   1295 def __call__(self, *args):
-> 1296 args_command, temp_args = self._build_args(*args)
   1297 
   1298 command = proto.CALL_COMMAND_NAME +\

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
 in _build_args(self, *args)
   1258 def _build_args(self, *args):
   1259 if self.converters is not None and len(self.converters) > 0:
-> 1260 (new_args, temp_args) = self._get_args(args)
   1261 else:
   1262 new_args = args

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
 in _get_args(self, args)
   1245 for converter in self.gateway_client.converters:
   1246 if converter.can_convert(arg):
-> 1247 temp_arg = converter.convert(arg, 
self.gateway_client)
   1248 temp_args.append(temp_arg)
   1249 new_args.append(temp_arg)

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py
 in convert(self, object, gateway_client)
509 java_list = ArrayList()
510 for element in object:
--> 511 java_list.add(element)
512 return java_list
513 

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
   1294 
   1295 def __call__(self, *args):
-> 1296 args_command, temp_args = self._build_args(*args)
   1297 
   1298 command = proto.CALL_COMMAND_NAME +\

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
 in _build_args(self, *args)
   1264 
   1265 args_command = "".join(
-> 1266 [get_command_part(arg, self.pool) for arg in new_args])
   1267 
   1268 return args_command, temp_args

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
 in (.0)
   1264 
   1265 args_command = "".join(
-> 1266 [get_command_part(arg, self.pool) for arg in new_args])
   1267 
   1268 return args_command, temp_args

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py
 in get_command_part(parameter, python_proxy_pool)
296 command_part += ";" + interface
297 else:
--> 298 command_part = REFERENCE_TYPE + parameter._get_object_id()
299 
300 command_part += "\n"






-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[ANNOUNCE] Release Apache Kyuubi(Incubating) 1.3.0-incubating

2021-09-26 Thread Cheng Pan
Hello Spark Community,

The Apache Kyuubi(Incubating) community is pleased to announce that
Apache Kyuubi(Incubating) 1.3.0-incubating has been released!

Apache Kyuubi(Incubating) is a distributed multi-tenant JDBC server for
large-scale data processing and analytics, built on top of Apache Spark
and designed to support more engines (i.e. Flink).

Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC interface
for end-users to manipulate large-scale data with pre-programmed and
extensible Spark SQL engines.

We are aiming to make Kyuubi an "out-of-the-box" tool for data warehouses
and data lakes.

This "out-of-the-box" model minimizes the barriers and costs for end-users
to use Spark at the client side.

At the server-side, Kyuubi server and engine's multi-tenant architecture
provides the administrators a way to achieve computing resource isolation,
data security, high availability, high client concurrency, etc.

The full release notes and download links are available at:
Release Notes: https://kyuubi.apache.org/release/1.3.0-incubating.html

To learn more about Apache Kyuubi (Incubating), please see
https://kyuubi.apache.org/

Kyuubi Resources:
- Issue: https://github.com/apache/incubator-kyuubi/issues
- Mailing list: d...@kyuubi.apache.org

We would like to thank all contributors of the Kyuubi community and
Incubating
community who made this release possible!

Thanks,
On behalf of Apache Kyuubi(Incubating) community