Re: API Problem

2022-06-09 Thread Sean Owen
That repartition seems to do nothing? But yes the key point is use col()

On Thu, Jun 9, 2022, 9:41 PM Stelios Philippou  wrote:

> Perhaps
>
>
> finalDF.repartition(finalDF.rdd.getNumPartitions()).withColumn("status_for_batch
>
> To
>
> finalDF.repartition(finalDF.rdd.getNumPartitions()).withColumn(col("status_for_batch")
>
>
>
>
> On Thu, 9 Jun 2022, 22:32 Sid,  wrote:
>
>> Hi Experts,
>>
>> I am facing one problem while passing a column to the method.  The
>> problem is described in detail here:
>>
>>
>> https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark
>>
>> TIA,
>> Sid
>>
>


Spark streaming / confluent Kafka- messages are empty

2022-06-09 Thread KhajaAsmath Mohammed


Hi,

I am trying to read data from confluent Kafka using  avro schema registry. 
Messages are always empty and stream always shows empty records. Any suggestion 
on this please ??

Thanks,
Asmath
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: API Problem

2022-06-09 Thread Stelios Philippou
Perhaps

finalDF.repartition(finalDF.rdd.getNumPartitions()).withColumn("status_for_batch

To

finalDF.repartition(finalDF.rdd.getNumPartitions()).withColumn(col("status_for_batch")




On Thu, 9 Jun 2022, 22:32 Sid,  wrote:

> Hi Experts,
>
> I am facing one problem while passing a column to the method.  The problem
> is described in detail here:
>
>
> https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark
>
> TIA,
> Sid
>


API Problem

2022-06-09 Thread Sid
Hi Experts,

I am facing one problem while passing a column to the method.  The problem
is described in detail here:

https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark

TIA,
Sid


Re: to find Difference of locations in Spark Dataframe rows

2022-06-09 Thread Bjørn Jørgensen
If KM is kilometre then you must replace val distance = atan2(sqrt(a), sqrt
(-a + 1)) * 2 * 6371
to val distance = atan2(sqrt(a), sqrt(-a + 1)) * 2 * 12742

Have a look at this gnist Spherical distance calcualtion based on latitude
and longitude with Apache Spark


tir. 7. jun. 2022 kl. 19:39 skrev Chetan Khatri :

> Hi Dear Spark Users,
>
> It has been many years that I have worked on Spark, Please help me. Thanks
> much
>
> I have different cities and their co-ordinates in DataFrame[Row], I want
> to find distance in KMs and then show only those records /cities which are
> 10 KMs far.
>
> I have a function created that can find the distance in KMs given two
> co-coordinates. But I don't know how to apply it to rows, like one to many
> and calculate the distance.
>
> Some code that I wrote, Sorry for the basic code.
>
> lass HouseMatching {
>   def main(args: Array[String]): Unit = {
>
> val search_property_id = args(0)
>
> // list of columns where the condition should be exact match
> val groupOneCriteria = List(
>   "occupied_by_tenant",
>   "water_index",
>   "electricity_index",
>   "elevator_index",
>   "heating_index",
>   "nb_bathtubs",
>   "nb_showers",
>   "nb_wc",
>   "nb_rooms",
>   "nb_kitchens"
> )
> // list of columns where the condition should be matching 80%
> val groupTwoCriteria = List(
>   "area",
>   "home_condition",
>   "building_age"
> )
> // list of columns where the condition should be found using Euclidean 
> distance
> val groupThreeCriteria = List(
>   "postal_code"
> )
>
> val region_or_city = "region"
>
> def haversineDistance(destination_latitude: Column, 
> destination_longitude: Column, origin_latitude: Column,
>   origin_longitude: Column): Column = {
>   val a = pow(sin(radians(destination_latitude - origin_latitude) / 2), 
> 2) +
> cos(radians(origin_latitude)) * cos(radians(destination_latitude)) *
>   pow(sin(radians(destination_longitude - origin_longitude) / 2), 2)
>   val distance = atan2(sqrt(a), sqrt(-a + 1)) * 2 * 6371
>   distance
> }
>
> val spark = SparkSession.builder().appName("real-estate-property-matcher")
>   .getOrCreate()
>
> val housingDataDF = 
> spark.read.csv("~/Downloads/real-estate-sample-data.csv")
>
> // searching for the property by `ref_id`
> val searchPropertyDF = housingDataDF.filter(col("ref_id") === 
> search_property_id)
>
> // Similar house in the same city (same postal code) and group one 
> condition
> val similarHouseAndSameCity = housingDataDF.join(searchPropertyDF, 
> groupThreeCriteria ++ groupOneCriteria,
>   "inner")
>
> // Similar house not in the same city but 10km range
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Retrieve the count of spark nodes

2022-06-09 Thread Poorna Murali
Thanks Stephen! I will try this out.

On Thu, 9 Jun, 2022, 6:02 am Stephen Coy,  wrote:

> Hi there,
>
> We use something like:
>
> /*
>  * Force Spark to initialise the defaultParallelism by executing a dummy 
> parallel operation and then return
>  * the resulting defaultParallelism.
>  */
> private int getWorkerCount(SparkContext sparkContext) {
> sparkContext.parallelize(List.of(1, 2, 3, 4)).collect();
> return sparkContext.defaultParallelism();
> }
>
>
>
> Its useful for setting certain pool sizes dynamically, such as:
>
> sparkContext.hadoopConfiguration().set("fs.s3a.connection.maximum", 
> Integer.toString(workerCount * 2));
>
>
> This  works in our Spark 3.0.1 code; just migrating to 3.2.1 now.
>
> Cheers,
>
> Steve C
>
> On 8 Jun 2022, at 4:28 pm, Poorna Murali  wrote:
>
> Hi,
>
> I would like to know if it is possible to  get the count of live master
> and worker spark nodes running in a system.
>
> Please help to clarify the same.
>
> Thanks,
> Poorna
>
>
> This email contains confidential information of and is the copyright of
> Infomedia. It must not be forwarded, amended or disclosed without consent
> of the sender. If you received this message by mistake, please advise the
> sender and delete all copies. Security of transmission on the internet
> cannot be guaranteed, could be infected, intercepted, or corrupted and you
> should ensure you have suitable antivirus protection in place. By sending
> us your or any third party personal details, you consent to (or confirm you
> have obtained consent from such third parties) to Infomedia’s privacy
> policy. http://www.infomedia.com.au/privacy-policy/
>