Single node spark issue in Sparkly/RStudio

2023-03-16 Thread elango vaidyanathan
Hi team,

In a single Linux node, I would like to set up Rstudio with Sparkly. Three
to four people make up the dev team.
I am aware of the single-node spark cluster's constraints. When there is a
resource problem with Spark, I want to know when more users join in to use
Sparkly in Rstudio. It should simply retain the new jobs in the queue
rather than crashing.
I think this is not only specific to Rstudio/SparklyR. Even applicable for
Spark/Pyspark with a single node cluster.
Please share the optimal method for allocating Spark's resources in this
scenario.


Thanks,
Elango


Re: Spark StructuredStreaming - watermark not working as expected

2023-03-16 Thread karan alang
Fyi .. apache spark version is 3.1.3

On Wed, Mar 15, 2023 at 4:34 PM karan alang  wrote:

> Hi Mich, this doesn't seem to be working for me .. the watermark seems to
> be getting ignored !
>
> Here is the data put into Kafka :
>
> ```
>
>
> +---++
>
> |value
>   |key |
>
>
> +---++
>
>
> |{"temparature":14,"insert_ts":"2023-03-15T16:04:33.003-07:00","ts":"2023-03-15T15:12:00.000-07:00"}|null|
>
>
> |{"temparature":10,"insert_ts":"2023-03-15T16:05:58.816-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|
>
>
> |{"temparature":17,"insert_ts":"2023-03-15T16:07:55.222-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|
>
> |{"temparature":6,"insert_ts":"2023-03-15T16:11:41.759-07:00","ts":"2023-03-13T10:12:00.000-07:00"}
> |null|
>
>
> +---++
> ```
> Note :
> insert_ts - specifies when the data was inserted
>
> Here is the output of the Structured Stream:
>
> ---
>
> Batch: 2
>
> ---
>
> +---+---+---+
>
> |startOfWindowFrame |endOfWindowFrame   |Sum_Temperature|
>
> +---+---+---+
>
> |2023-03-15 16:10:00|2023-03-15 16:15:00|27 |
>
> |2023-03-15 15:10:00|2023-03-15 15:15:00|14 |
>
> |2023-03-13 10:10:00|2023-03-13 10:15:00|6  |
>
> +---+---+---+
>
> Note: I'm summing up the temperatures (for easy verification)
>
> As per the above - all the 3 'ts' are included in the DataFrame, even when
> I added   "ts":"2023-03-13T10:12:00.000-07:00", as the last record.
> Since the wattermark is set to "5 minutes" and the max(ts) ==
> 2023-03-15T16:12:00.000-07:00
> record with ts = "2023-03-13T10:12:00.000-07:00" should have got dropped,
> it is more than 2 days old (i.e. dated - 2023-03-13)!
>
> Any ideas what needs to be changed to make this work ?
>
> Here is the code (modified for my requirement, but essentially the same)
> ```
>
> schema = StructType([
> StructField("temparature", LongType(), False),
> StructField("ts", TimestampType(), False),
> StructField("insert_ts", TimestampType(), False)
> ])
>
> streamingDataFrame = spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers", kafkaBrokers) \
> .option("group.id", 'watermark-grp') \
> .option("subscribe", topic) \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"), 
> schema=schema).alias("parsed_value"))
>
> resultC = streamingDataFrame.select( 
> col("parsed_value.ts").alias("timestamp") \
>, col("parsed_value.temparature").alias("temparature"), 
> col("parsed_value.insert_ts").alias("insert_ts"))
>
> resultM = resultC. \
> withWatermark("timestamp", "5 minutes"). \
> groupBy(window(resultC.timestamp, "5 minutes", "5 minutes")). \
> agg({'temparature':'sum'})
>
> resultMF = resultM. \
> 
> select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame")
>  \
>   , col("sum(temparature)").alias("Sum_Temperature"))
>
> result = resultMF. \
>  writeStream. \
>  outputMode('complete'). \
>  option("numRows", 1000). \
>  option("truncate", "false"). \
>  format('console'). \
>  option('checkpointLocation', checkpoint_path). \
>  queryName("sum_temparature"). \
>  start()
>
> result.awaitTermination()
>
> ```
>
>
>
> On Sun, Mar 12, 2023 at 3:13 AM Mich Talebzadeh 
> wrote:
>
>> OK
>>
>> ts is the timestamp right?
>>
>> This is a similar code that works out the average temperature with time
>> frame of 5 minutes.
>>
>> Note the comments and catch error with try:
>>
>> try:
>>
>> # construct a streaming dataframe streamingDataFrame that
>> subscribes to topic temperature
>> streamingDataFrame = self.spark \
>> .readStream \
>> .format("kafka") \
>> .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>> .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>> .option("group.id", config['common']['appName']) \
>> .option("z

Re: Understanding executor memory behavior

2023-03-16 Thread Sean Owen
All else equal it is better to have the same resources in fewer executors.
More tasks are local to other tasks which helps perf. There is more
possibility of 'borrowing' extra mem and CPU in a task.

On Thu, Mar 16, 2023, 2:14 PM Nikhil Goyal  wrote:

> Hi folks,
> I am trying to understand what would be the difference in running 8G 1
> core executor vs 40G 5 core executors. I see that on yarn it can cause bin
> fitting issues but other than that are there any pros and cons on using
> either?
>
> Thanks
> Nikhil
>


Understanding executor memory behavior

2023-03-16 Thread Nikhil Goyal
Hi folks,
I am trying to understand what would be the difference in running 8G 1 core
executor vs 40G 5 core executors. I see that on yarn it can cause bin
fitting issues but other than that are there any pros and cons on using
either?

Thanks
Nikhil