Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-24 Thread Anil Dasari
It appears that structured streaming and Dstream have entirely different
microbatch metadata representation
Can someone assist me in finding the following Dstream microbatch metadata
equivalent in Structured streaming.

1. microbatch timestamp : structured streaming foreachBatch gives batchID
which is not a timestamp. Is there a way to get the microbatch timestamp ?
2. microbatch start event ?
3. scheduling delay of a microbatch ?
4. pending microbatches in case of fixed internal microbatches ?

Thanks

On Wed, May 22, 2024 at 5:23 PM Anil Dasari  wrote:

> You are right.
> - another question on migration. Is there a way to get the microbatch id
> during the microbatch dataset `trasform` operation like in rdd transform ?
> I am attempting to implement the following pseudo functionality with
> structured streaming. In this approach, recordCategoriesMetadata is fetched
> and rdd metrics like rdd size etc using microbatch idin the transform
> operation.
> ```code
> val rddQueue = new mutable.Queue[RDD[Int]]()
> // source components
> val sources = Seq.empty[String]
> val consolidatedDstream = sources
> .map(source => {
> val inputStream = ssc.queueStream(rddQueue)
> inputStream.transform((rdd, ts) => {
> // emit metrics of microbatch ts : rdd size etc.
>
> val recordCategories = rdd.map(..).collect();
> val recordCategoriesMetadata = ...
> rdd
> .map(r =>
> val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
> (source, customRecord)
> )
> })
> }
> )
> .reduceLeft(_ union _)
>
> consolidatedDstream
> .foreachRDD((rdd, ts) => {
> // get pipes for each source
> val pipes = Seq.empty[String] // pipes of given source
> pipes.foreach(pipe => {
> val pipeSource = null; // get from pipe variable
> val psRDD = rdd
> .filter {
> case (source, sourceRDD) => source.equals(pipeSource)
> }
> // apply pipe transformation and sink
>
> })
> })
> ```
>
> In structured streaming, it can look like -
>
> ```code
> val consolidatedDstream = sources
> .map(source => {
> val inputStream = ... (for each source)
> inputStream
> }
> )
> .reduceLeft(_ union _)
>
> consolidatedDstream
> .writeStream
> .foreachBatch((ds, ts) => {
> val newDS = ds.transform((internalDS => {
> // emit metrics of microbatch ts : rdd size etc.
>
> val recordCategories = rdd.map(..).collect();
> val recordCategoriesMetadata = ...
> internalDS
> .map(r =>
> val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
> (source, customRecord)
> )
> })(... )
> // get pipes for each source
> val pipes = Seq.empty[String] // pipes of given source
> pipes.foreach(pipe => {
> val pipeSource = null; // get from pipe variable
> val psRDD = newDS
> .filter {
> case (source, sourceDS) => source.equals(pipeSource)
> }
> // apply pipe transformation and sink
>
> })
> })
> ```
> ^ is just pseudo code and still not sure if it works. Let me know your
> suggestions if any. thanks.
>
> On Wed, May 22, 2024 at 8:34 AM Tathagata Das 
> wrote:
>
>> The right way to associated microbatches when committing to external
>> storage is to use the microbatch id that you can get in foreachBatch. That
>> microbatch id guarantees that the data produced in the batch is the always
>> the same no matter any recomputations (assuming all processing logic is
>> deterministic). So you can commit the batch id + batch data together. And
>> then async commit the batch id + offsets.
>>
>> On Wed, May 22, 2024 at 11:27 AM Anil Dasari 
>> wrote:
>>
>>> Thanks Das, Mtich.
>>>
>>> Mitch,
>>> We process data from Kafka and write it to S3 in Parquet format using
>>> Dstreams. To ensure exactly-once delivery and prevent data loss, our
>>> process records micro-batch offsets to an external storage at the end of
>>> each micro-batch in foreachRDD, which is then used when the job restarts.
>>>
>>> Das,
>>> Thanks for sharing the details. I will look into them.
>>> Unfortunately, the listeners process is async and can't
>>> guarantee happens before association with microbatch to commit offsets to
>>> external storage. But still they will work. Is there a way to access
>>> lastProgress in foreachBatch ?
>>>
>>>
>>> On Wed, May 22, 2024 at 7:35 AM Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> If you want to find what offset ranges are present in a microbatch in
>>>> Structured Streaming, you have to look at the
>>>> StreamingQuery.lastProgress or use the QueryProgressListener
>>>> <https://spark.apach

Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Anil Dasari
You are right.
- another question on migration. Is there a way to get the microbatch id
during the microbatch dataset `trasform` operation like in rdd transform ?
I am attempting to implement the following pseudo functionality with
structured streaming. In this approach, recordCategoriesMetadata is fetched
and rdd metrics like rdd size etc using microbatch idin the transform
operation.
```code
val rddQueue = new mutable.Queue[RDD[Int]]()
// source components
val sources = Seq.empty[String]
val consolidatedDstream = sources
.map(source => {
val inputStream = ssc.queueStream(rddQueue)
inputStream.transform((rdd, ts) => {
// emit metrics of microbatch ts : rdd size etc.

val recordCategories = rdd.map(..).collect();
val recordCategoriesMetadata = ...
rdd
.map(r =>
val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
(source, customRecord)
)
})
}
)
.reduceLeft(_ union _)

consolidatedDstream
.foreachRDD((rdd, ts) => {
// get pipes for each source
val pipes = Seq.empty[String] // pipes of given source
pipes.foreach(pipe => {
val pipeSource = null; // get from pipe variable
val psRDD = rdd
.filter {
case (source, sourceRDD) => source.equals(pipeSource)
}
// apply pipe transformation and sink

})
})
```

In structured streaming, it can look like -

```code
val consolidatedDstream = sources
.map(source => {
val inputStream = ... (for each source)
inputStream
}
)
.reduceLeft(_ union _)

consolidatedDstream
.writeStream
.foreachBatch((ds, ts) => {
val newDS = ds.transform((internalDS => {
// emit metrics of microbatch ts : rdd size etc.

val recordCategories = rdd.map(..).collect();
val recordCategoriesMetadata = ...
internalDS
.map(r =>
val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
(source, customRecord)
)
})(... )
// get pipes for each source
val pipes = Seq.empty[String] // pipes of given source
pipes.foreach(pipe => {
val pipeSource = null; // get from pipe variable
val psRDD = newDS
.filter {
case (source, sourceDS) => source.equals(pipeSource)
}
// apply pipe transformation and sink

})
})
```
^ is just pseudo code and still not sure if it works. Let me know your
suggestions if any. thanks.

On Wed, May 22, 2024 at 8:34 AM Tathagata Das 
wrote:

> The right way to associated microbatches when committing to external
> storage is to use the microbatch id that you can get in foreachBatch. That
> microbatch id guarantees that the data produced in the batch is the always
> the same no matter any recomputations (assuming all processing logic is
> deterministic). So you can commit the batch id + batch data together. And
> then async commit the batch id + offsets.
>
> On Wed, May 22, 2024 at 11:27 AM Anil Dasari 
> wrote:
>
>> Thanks Das, Mtich.
>>
>> Mitch,
>> We process data from Kafka and write it to S3 in Parquet format using
>> Dstreams. To ensure exactly-once delivery and prevent data loss, our
>> process records micro-batch offsets to an external storage at the end of
>> each micro-batch in foreachRDD, which is then used when the job restarts.
>>
>> Das,
>> Thanks for sharing the details. I will look into them.
>> Unfortunately, the listeners process is async and can't guarantee happens
>> before association with microbatch to commit offsets to external storage.
>> But still they will work. Is there a way to access lastProgress in
>> foreachBatch ?
>>
>>
>> On Wed, May 22, 2024 at 7:35 AM Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> If you want to find what offset ranges are present in a microbatch in
>>> Structured Streaming, you have to look at the
>>> StreamingQuery.lastProgress or use the QueryProgressListener
>>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html>.
>>> Both of these approaches gives you access to the SourceProgress
>>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/SourceProgress.html>
>>> which gives Kafka offsets as a JSON string.
>>>
>>> Hope this helps!
>>>
>>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> OK to understand better your current model relies on streaming data
>>>> input through Kafka topic, Spark does some ETL and you send to a sink, a
>>>> database for file storage like HDFS etc?
>>>>
>>>> Your current architecture relies on Direct Streams (DStream) and RDDs
>>>> and you want to move to Spark sStructured Streaming based on dataframes and
>>>> datasets?
>>>>
>>>> You have not specified your sink
>>>>
>>>> With regard to your question?
>>&g

Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Anil Dasari
Thanks Das, Mtich.

Mitch,
We process data from Kafka and write it to S3 in Parquet format using
Dstreams. To ensure exactly-once delivery and prevent data loss, our
process records micro-batch offsets to an external storage at the end of
each micro-batch in foreachRDD, which is then used when the job restarts.

Das,
Thanks for sharing the details. I will look into them.
Unfortunately, the listeners process is async and can't guarantee happens
before association with microbatch to commit offsets to external storage.
But still they will work. Is there a way to access lastProgress in
foreachBatch ?


On Wed, May 22, 2024 at 7:35 AM Tathagata Das 
wrote:

> If you want to find what offset ranges are present in a microbatch in
> Structured Streaming, you have to look at the StreamingQuery.lastProgress or
> use the QueryProgressListener
> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html>.
> Both of these approaches gives you access to the SourceProgress
> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/SourceProgress.html>
> which gives Kafka offsets as a JSON string.
>
> Hope this helps!
>
> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> OK to understand better your current model relies on streaming data input
>> through Kafka topic, Spark does some ETL and you send to a sink, a
>> database for file storage like HDFS etc?
>>
>> Your current architecture relies on Direct Streams (DStream) and RDDs and
>> you want to move to Spark sStructured Streaming based on dataframes and
>> datasets?
>>
>> You have not specified your sink
>>
>> With regard to your question?
>>
>> "Is there an equivalent of Dstream HasOffsetRanges in structure
>> streaming to get the microbatch end offsets to the checkpoint in our
>> external checkpoint store ?"
>>
>> There is not a direct equivalent of DStream HasOffsetRanges in Spark
>> Structured Streaming. However, Structured Streaming provides mechanisms to
>> achieve similar functionality:
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>> <https://en.everybodywiki.com/Mich_Talebzadeh>
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
>> Braun
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>
>> )".
>>
>>
>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
>>  wrote:
>>
>>> Hello,
>>>
>>> what options are you considering yourself?
>>>
>>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
>>> adas...@guidewire.com> wrote:
>>>
>>>
>>> Hello,
>>>
>>> We are on Spark 3.x and using Spark dstream + kafka and planning to use
>>> structured streaming + Kafka.
>>> Is there an equivalent of Dstream HasOffsetRanges in structure streaming
>>> to get the microbatch end offsets to the checkpoint in our external
>>> checkpoint store ? Thanks in advance.
>>>
>>> Regards
>>>
>>>


Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Anil Dasari
Hello,

We are on Spark 3.x and using Spark dstream + kafka and planning to use
structured streaming + Kafka.
Is there an equivalent of Dstream HasOffsetRanges in structure streaming to
get the microbatch end offsets to the checkpoint in our external checkpoint
store ? Thanks in advance.

Regards


[Spark-SQL] Dataframe write saveAsTable failed

2023-06-26 Thread Anil Dasari
Hi,

We have upgraded Spark from 2.4.x to 3.3.1 recently and managed table
creation while writing dataframe as saveAsTable failed with below error.

Can not create the managed table(``) The associated
location('hdfs:') already exists.

On high level our code does below before writing dataframe as table:

sparkSession.sql(s"DROP TABLE IF EXISTS $hiveTableName PURGE")
mydataframe.write.mode(SaveMode.Overwrite).saveAsTable(hiveTableName)

The above code works with Spark 2 because of
spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation which is
deprecated in Spark 3.

The table is dropped and purged before writing the dataframe. I expected
dataframe write shouldn't complain that the path already exists.

After digging further, I noticed there is `_tempory` folder present in the
hdfs table path.

dfs -ls /apps/hive/warehouse//
Found 1 items
drwxr-xr-x   - hadoop hdfsadmingroup  0 2023-06-23 04:45
/apps/hive/warehouse//_temporary

[root@ip-10-121-107-90 bin]# hdfs dfs -ls
/apps/hive/warehouse//_temporary
Found 1 items
drwxr-xr-x   - hadoop hdfsadmingroup  0 2023-06-23 04:45
/apps/hive/warehouse//_temporary/0

[root@ip-10-121-107-90 bin]# hdfs dfs -ls
/apps/hive/warehouse//_temporary/0
Found 1 items
drwxr-xr-x   - hadoop hdfsadmingroup  0 2023-06-23 04:45
/apps/hive/warehouse//_temporary/0/_temporary

Is it because of task failures ? Is there a way to workaround this issue ?

Thanks


[Spark streaming]: Microbatch id in logs

2023-06-25 Thread Anil Dasari
Hi,
I am using spark 3.3.1 distribution and spark stream in my application. Is
there a way to add a microbatch id to all logs generated by spark and spark
applications ?

Thanks.


Re: Spark streaming pending mircobatches queue max length

2022-07-13 Thread Anil Dasari
Retry.

From: Anil Dasari 
Date: Tuesday, July 12, 2022 at 3:42 PM
To: user@spark.apache.org 
Subject: Spark streaming pending mircobatches queue max length
Hello,

Spark is adding entry to pending microbatches queue at periodic batch interval. 
Is there config to set the max size for pending microbatches queue ?

Thanks


Spark streaming pending mircobatches queue max length

2022-07-12 Thread Anil Dasari
Hello,

Spark is adding entry to pending microbatches queue at periodic batch interval. 
Is there config to set the max size for pending microbatches queue ?

Thanks


Re: {EXT} Re: Spark sql slowness in Spark 3.0.1

2022-04-15 Thread Anil Dasari
Hello,

DF is checkpointed here. So it is written to HDFS. DF is written in paraquet 
format and used default parallelism.

Thanks.

From: wilson 
Date: Thursday, April 14, 2022 at 2:54 PM
To: user@spark.apache.org 
Subject: {EXT} Re: Spark sql slowness in Spark 3.0.1
just curious, where to  write?


Anil Dasari wrote:
> We are upgrading spark from 2.4.7 to 3.0.1. we use spark sql (hive) to
> checkpoint data frames (intermediate data). DF write is very slow in
> 3.0.1 compared to 2.4.7.
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Spark sql slowness in Spark 3.0.1

2022-04-14 Thread Anil Dasari
Hello,

We are upgrading spark from 2.4.7 to 3.0.1. we use spark sql (hive) to 
checkpoint data frames (intermediate data). DF write is very slow in 3.0.1 
compared to 2.4.7.
Have read the release notes and there were no major changes except managed 
tables and adaptive scheduling. We are not using adaptive scheduling and going 
with default config. We made changes to handle managed tables by adding 
explicit paths during writes and delete.

Do you have any suggestions to debug and fix the slowness problem ?

Thanks,



Re: {EXT} Re: Spark Parquet write OOM

2022-03-05 Thread Anil Dasari
I am not sure how to set the records limit. Let me check. I couldn’t find 
parquet row group size configuration in spark.
For now, I increased the number if shuffle partitions to reduce the records 
processed by task to avoid OOM.

Regards,
Anil

From: Gourav Sengupta 
Date: Saturday, March 5, 2022 at 1:59 AM
To: Anil Dasari 
Cc: Yang,Jie(INF) , user@spark.apache.org 

Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Anil,

any chance you tried setting the limit on the number of records to be written 
out at a time?

Regards,
Gourav

On Thu, Mar 3, 2022 at 3:12 PM Anil Dasari 
mailto:adas...@guidewire.com>> wrote:
Hi Gourav,
Tried increasing shuffle partitions number and higher executor memory. Both 
didn’t work.

Regards

From: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>
Date: Thursday, March 3, 2022 at 2:24 AM
To: Anil Dasari mailto:adas...@guidewire.com>>
Cc: Yang,Jie(INF) mailto:yangji...@baidu.com>>, 
user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi,

I do not think that you are doing anything very particularly concerning here.

There is a setting in SPARK which limits the number of records that we can 
write out at a time you can try that. The other thing that you can try is to 
ensure that the number of partitions are more (just like you suggested) let me 
know how things are giong on your end


Regards,
Gourav Sengupta


On Thu, Mar 3, 2022 at 8:37 AM Anil Dasari 
mailto:adas...@guidewire.com>> wrote:
Answers in the context. Thanks.

From: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>
Date: Thursday, March 3, 2022 at 12:13 AM
To: Anil Dasari mailto:adas...@guidewire.com>>
Cc: Yang,Jie(INF) mailto:yangji...@baidu.com>>, 
user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Anil,

I was trying to work out things for a while yesterday, but may need your kind 
help.

Can you please share the code for the following steps?
*
Create DF from hive (from step #c)
[AD] sparkSession.table()

*  Deduplicate spark DF by primary key
[AD] dataFrame.dropDuplicates()

*  Write DF to s3 in parquet format
[AD] dataFrame.write.mode(saveMode).parquet(path)

*  Write metadata to s3
[AD] metadata in json written to s3 using aws sdk

Regards,
Gourav Sengupta

On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari 
mailto:adas...@guidewire.com>> wrote:
2nd attempt..

Any suggestions to troubleshoot and fix the problem ? thanks in advance.

Regards,
Anil

From: Anil Dasari mailto:adas...@guidewire.com>>
Date: Wednesday, March 2, 2022 at 7:00 AM
To: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>, Yang,Jie(INF) 
mailto:yangji...@baidu.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Gourav and Yang
Thanks for the response.

Please find the answers below.

1. What is the version of SPARK you are using?
[AD] : Spark 2.4.7 (EMR 5.33.1)

2. Are you doing a lot of in-memory transformations like adding columns, or 
running joins, or UDFs thus increasing the size of the data before writing out?
[AD] No. Only one new column is added. Our flow is

  1.  Read avro data from kafka
  2.  Avro deserialization and add new colum to RDD
  3.  Create spark dataframe (DF) against to latest schema (avro evolved 
schema) and persist to hive (checkpointing)
  4.  Create DF from hive (from step #c)
  5.  Deduplicate spark DF by primary key
  6.  Write DF to s3 in parquet format
  7.  Write metadata to s3

The failure is from spark batch job

3. Is your pipeline going to change or evolve soon, or the data volumes going 
to vary, or particularly increase, over time?
[AD] : Data volume Is fixed as it is batch job.

4. What is the memory that you are having in your executors, and drivers?
[AD] We running one core node and 50 task nodes .. i.e total 51 nodes ..each 
node can create 2 executors (2 core cpu and 8 gb memory)

5. Can you show the list of transformations that you are running ?
[AD] No explicit transformations other than basic map transformations required 
to create dataframe from avor record rdd.

Please let me know if yo have any questions.

Regards,
Anil

From: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>
Date: Wednesday, March 2, 2022 at 1:07 AM
To: Yang,Jie(INF) mailto:yangji...@baidu.com>>
Cc: Anil Dasari mailto:adas...@guidewire.com>>, 
user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>
Subject: {EXT} Re: Spark Parquet write OOM
Hi Anil,

before jumping to the quick symptomatic fix, can we try to understand the 
issues?

1. What is the version of SPARK you are using?
2. Are you doing a lot of in-memory transformations like adding columns, or 
running joins, or UDFs thus increasing the si

Re: {EXT} Re: Spark Parquet write OOM

2022-03-03 Thread Anil Dasari
Hi Gourav,
Tried increasing shuffle partitions number and higher executor memory. Both 
didn’t work.

Regards

From: Gourav Sengupta 
Date: Thursday, March 3, 2022 at 2:24 AM
To: Anil Dasari 
Cc: Yang,Jie(INF) , user@spark.apache.org 

Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi,

I do not think that you are doing anything very particularly concerning here.

There is a setting in SPARK which limits the number of records that we can 
write out at a time you can try that. The other thing that you can try is to 
ensure that the number of partitions are more (just like you suggested) let me 
know how things are giong on your end


Regards,
Gourav Sengupta


On Thu, Mar 3, 2022 at 8:37 AM Anil Dasari 
mailto:adas...@guidewire.com>> wrote:
Answers in the context. Thanks.

From: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>
Date: Thursday, March 3, 2022 at 12:13 AM
To: Anil Dasari mailto:adas...@guidewire.com>>
Cc: Yang,Jie(INF) mailto:yangji...@baidu.com>>, 
user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Anil,

I was trying to work out things for a while yesterday, but may need your kind 
help.

Can you please share the code for the following steps?
*
Create DF from hive (from step #c)
[AD] sparkSession.table()

*  Deduplicate spark DF by primary key
[AD] dataFrame.dropDuplicates()

*  Write DF to s3 in parquet format
[AD] dataFrame.write.mode(saveMode).parquet(path)

*  Write metadata to s3
[AD] metadata in json written to s3 using aws sdk

Regards,
Gourav Sengupta

On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari 
mailto:adas...@guidewire.com>> wrote:
2nd attempt..

Any suggestions to troubleshoot and fix the problem ? thanks in advance.

Regards,
Anil

From: Anil Dasari mailto:adas...@guidewire.com>>
Date: Wednesday, March 2, 2022 at 7:00 AM
To: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>, Yang,Jie(INF) 
mailto:yangji...@baidu.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Gourav and Yang
Thanks for the response.

Please find the answers below.

1. What is the version of SPARK you are using?
[AD] : Spark 2.4.7 (EMR 5.33.1)

2. Are you doing a lot of in-memory transformations like adding columns, or 
running joins, or UDFs thus increasing the size of the data before writing out?
[AD] No. Only one new column is added. Our flow is

  1.  Read avro data from kafka
  2.  Avro deserialization and add new colum to RDD
  3.  Create spark dataframe (DF) against to latest schema (avro evolved 
schema) and persist to hive (checkpointing)
  4.  Create DF from hive (from step #c)
  5.  Deduplicate spark DF by primary key
  6.  Write DF to s3 in parquet format
  7.  Write metadata to s3

The failure is from spark batch job

3. Is your pipeline going to change or evolve soon, or the data volumes going 
to vary, or particularly increase, over time?
[AD] : Data volume Is fixed as it is batch job.

4. What is the memory that you are having in your executors, and drivers?
[AD] We running one core node and 50 task nodes .. i.e total 51 nodes ..each 
node can create 2 executors (2 core cpu and 8 gb memory)

5. Can you show the list of transformations that you are running ?
[AD] No explicit transformations other than basic map transformations required 
to create dataframe from avor record rdd.

Please let me know if yo have any questions.

Regards,
Anil

From: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>
Date: Wednesday, March 2, 2022 at 1:07 AM
To: Yang,Jie(INF) mailto:yangji...@baidu.com>>
Cc: Anil Dasari mailto:adas...@guidewire.com>>, 
user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>
Subject: {EXT} Re: Spark Parquet write OOM
Hi Anil,

before jumping to the quick symptomatic fix, can we try to understand the 
issues?

1. What is the version of SPARK you are using?
2. Are you doing a lot of in-memory transformations like adding columns, or 
running joins, or UDFs thus increasing the size of the data before writing out?
3. Is your pipeline going to change or evolve soon, or the data volumes going 
to vary, or particularly increase, over time?
4. What is the memory that you are having in your executors, and drivers?
5. Can you show the list of transformations that you are running ?




Regards,
Gourav Sengupta


On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) 
mailto:yangji...@baidu.com>> wrote:
This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase the 
capacity of DirectByteBuffer size by configuring  `-XX:MaxDirectMemorySize` and 
this is a Java opts.

However, we'd better check the length of memory to be allocated,  because  
`-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by default.


发件人: Anil Dasari mailto:adas...@guidewire.com>>
日期: 2022年3

Re: {EXT} Re: Spark Parquet write OOM

2022-03-03 Thread Anil Dasari
Answers in the context. Thanks.

From: Gourav Sengupta 
Date: Thursday, March 3, 2022 at 12:13 AM
To: Anil Dasari 
Cc: Yang,Jie(INF) , user@spark.apache.org 

Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Anil,

I was trying to work out things for a while yesterday, but may need your kind 
help.

Can you please share the code for the following steps?
*
Create DF from hive (from step #c)
[AD] sparkSession.table()

*  Deduplicate spark DF by primary key
[AD] dataFrame.dropDuplicates()

*  Write DF to s3 in parquet format
[AD] dataFrame.write.mode(saveMode).parquet(path)

*  Write metadata to s3
[AD] metadata in json written to s3 using aws sdk

Regards,
Gourav Sengupta

On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari 
mailto:adas...@guidewire.com>> wrote:
2nd attempt..

Any suggestions to troubleshoot and fix the problem ? thanks in advance.

Regards,
Anil

From: Anil Dasari mailto:adas...@guidewire.com>>
Date: Wednesday, March 2, 2022 at 7:00 AM
To: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>, Yang,Jie(INF) 
mailto:yangji...@baidu.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Gourav and Yang
Thanks for the response.

Please find the answers below.

1. What is the version of SPARK you are using?
[AD] : Spark 2.4.7 (EMR 5.33.1)

2. Are you doing a lot of in-memory transformations like adding columns, or 
running joins, or UDFs thus increasing the size of the data before writing out?
[AD] No. Only one new column is added. Our flow is

  1.  Read avro data from kafka
  2.  Avro deserialization and add new colum to RDD
  3.  Create spark dataframe (DF) against to latest schema (avro evolved 
schema) and persist to hive (checkpointing)
  4.  Create DF from hive (from step #c)
  5.  Deduplicate spark DF by primary key
  6.  Write DF to s3 in parquet format
  7.  Write metadata to s3

The failure is from spark batch job

3. Is your pipeline going to change or evolve soon, or the data volumes going 
to vary, or particularly increase, over time?
[AD] : Data volume Is fixed as it is batch job.

4. What is the memory that you are having in your executors, and drivers?
[AD] We running one core node and 50 task nodes .. i.e total 51 nodes ..each 
node can create 2 executors (2 core cpu and 8 gb memory)

5. Can you show the list of transformations that you are running ?
[AD] No explicit transformations other than basic map transformations required 
to create dataframe from avor record rdd.

Please let me know if yo have any questions.

Regards,
Anil

From: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>
Date: Wednesday, March 2, 2022 at 1:07 AM
To: Yang,Jie(INF) mailto:yangji...@baidu.com>>
Cc: Anil Dasari mailto:adas...@guidewire.com>>, 
user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>
Subject: {EXT} Re: Spark Parquet write OOM
Hi Anil,

before jumping to the quick symptomatic fix, can we try to understand the 
issues?

1. What is the version of SPARK you are using?
2. Are you doing a lot of in-memory transformations like adding columns, or 
running joins, or UDFs thus increasing the size of the data before writing out?
3. Is your pipeline going to change or evolve soon, or the data volumes going 
to vary, or particularly increase, over time?
4. What is the memory that you are having in your executors, and drivers?
5. Can you show the list of transformations that you are running ?




Regards,
Gourav Sengupta


On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) 
mailto:yangji...@baidu.com>> wrote:
This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase the 
capacity of DirectByteBuffer size by configuring  `-XX:MaxDirectMemorySize` and 
this is a Java opts.

However, we'd better check the length of memory to be allocated,  because  
`-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by default.


发件人: Anil Dasari mailto:adas...@guidewire.com>>
日期: 2022年3月2日 星期三 09:45
收件人: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
主题: Spark Parquet write OOM

Hello everyone,

We are writing Spark Data frame to s3 in parquet and it is failing with below 
exception.

I wanted to try following to avoid OOM


  1.  increase the default sql shuffle partitions to reduce load on parquet 
writer tasks to avoid OOM and
  2.  Increase user memory (reduce memory fraction) to have more memory for 
other data structures assuming parquet writer uses user memory.

I am not sure if these fixes the OOM issue. So wanted to reach out community 
for any suggestions. Please let me know.

Exception:

org.apache.spark.SparkException: Task failed while writing rows.
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:

Re: {EXT} Re: Spark Parquet write OOM

2022-03-02 Thread Anil Dasari
2nd attempt..

Any suggestions to troubleshoot and fix the problem ? thanks in advance.

Regards,
Anil

From: Anil Dasari 
Date: Wednesday, March 2, 2022 at 7:00 AM
To: Gourav Sengupta , Yang,Jie(INF) 

Cc: user@spark.apache.org 
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Gourav and Yang
Thanks for the response.

Please find the answers below.

1. What is the version of SPARK you are using?
[AD] : Spark 2.4.7 (EMR 5.33.1)

2. Are you doing a lot of in-memory transformations like adding columns, or 
running joins, or UDFs thus increasing the size of the data before writing out?
[AD] No. Only one new column is added. Our flow is

  1.  Read avro data from kafka
  2.  Avro deserialization and add new colum to RDD
  3.  Create spark dataframe (DF) against to latest schema (avro evolved 
schema) and persist to hive (checkpointing)
  4.  Create DF from hive (from step #c)
  5.  Deduplicate spark DF by primary key
  6.  Write DF to s3 in parquet format
  7.  Write metadata to s3

The failure is from spark batch job

3. Is your pipeline going to change or evolve soon, or the data volumes going 
to vary, or particularly increase, over time?
[AD] : Data volume Is fixed as it is batch job.

4. What is the memory that you are having in your executors, and drivers?
[AD] We running one core node and 50 task nodes .. i.e total 51 nodes ..each 
node can create 2 executors (2 core cpu and 8 gb memory)

5. Can you show the list of transformations that you are running ?
[AD] No explicit transformations other than basic map transformations required 
to create dataframe from avor record rdd.

Please let me know if yo have any questions.

Regards,
Anil

From: Gourav Sengupta 
Date: Wednesday, March 2, 2022 at 1:07 AM
To: Yang,Jie(INF) 
Cc: Anil Dasari , user@spark.apache.org 

Subject: {EXT} Re: Spark Parquet write OOM
Hi Anil,

before jumping to the quick symptomatic fix, can we try to understand the 
issues?

1. What is the version of SPARK you are using?
2. Are you doing a lot of in-memory transformations like adding columns, or 
running joins, or UDFs thus increasing the size of the data before writing out?
3. Is your pipeline going to change or evolve soon, or the data volumes going 
to vary, or particularly increase, over time?
4. What is the memory that you are having in your executors, and drivers?
5. Can you show the list of transformations that you are running ?




Regards,
Gourav Sengupta


On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) 
mailto:yangji...@baidu.com>> wrote:
This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase the 
capacity of DirectByteBuffer size by configuring  `-XX:MaxDirectMemorySize` and 
this is a Java opts.

However, we'd better check the length of memory to be allocated,  because  
`-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by default.


发件人: Anil Dasari mailto:adas...@guidewire.com>>
日期: 2022年3月2日 星期三 09:45
收件人: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
主题: Spark Parquet write OOM

Hello everyone,

We are writing Spark Data frame to s3 in parquet and it is failing with below 
exception.

I wanted to try following to avoid OOM


  1.  increase the default sql shuffle partitions to reduce load on parquet 
writer tasks to avoid OOM and
  2.  Increase user memory (reduce memory fraction) to have more memory for 
other data structures assuming parquet writer uses user memory.

I am not sure if these fixes the OOM issue. So wanted to reach out community 
for any suggestions. Please let me know.

Exception:

org.apache.spark.SparkException: Task failed while writing rows.
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
 at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError
 at sun.misc.Unsafe.allocateMemory(Native Method)
 at 
java.nio.DirectByteBuffer.(http://DirectByteBuffer.java:127<http://DirectByteBuffer.java:127>)
 at java.nio.ByteB

Re: {EXT} Re: Spark Parquet write OOM

2022-03-02 Thread Anil Dasari
Hi Gourav and Yang
Thanks for the response.

Please find the answers below.

1. What is the version of SPARK you are using?
[AD] : Spark 2.4.7 (EMR 5.33.1)

2. Are you doing a lot of in-memory transformations like adding columns, or 
running joins, or UDFs thus increasing the size of the data before writing out?
[AD] No. Only one new column is added. Our flow is

  1.  Read avro data from kafka
  2.  Avro deserialization and add new colum to RDD
  3.  Create spark dataframe (DF) against to latest schema (avro evolved 
schema) and persist to hive (checkpointing)
  4.  Create DF from hive (from step #c)
  5.  Deduplicate spark DF by primary key
  6.  Write DF to s3 in parquet format
  7.  Write metadata to s3

The failure is from spark batch job

3. Is your pipeline going to change or evolve soon, or the data volumes going 
to vary, or particularly increase, over time?
[AD] : Data volume Is fixed as it is batch job.

4. What is the memory that you are having in your executors, and drivers?
[AD] We running one core node and 50 task nodes .. i.e total 51 nodes ..each 
node can create 2 executors (2 core cpu and 8 gb memory)

5. Can you show the list of transformations that you are running ?
[AD] No explicit transformations other than basic map transformations required 
to create dataframe from avor record rdd.

Please let me know if yo have any questions.

Regards,
Anil

From: Gourav Sengupta 
Date: Wednesday, March 2, 2022 at 1:07 AM
To: Yang,Jie(INF) 
Cc: Anil Dasari , user@spark.apache.org 

Subject: {EXT} Re: Spark Parquet write OOM
Hi Anil,

before jumping to the quick symptomatic fix, can we try to understand the 
issues?

1. What is the version of SPARK you are using?
2. Are you doing a lot of in-memory transformations like adding columns, or 
running joins, or UDFs thus increasing the size of the data before writing out?
3. Is your pipeline going to change or evolve soon, or the data volumes going 
to vary, or particularly increase, over time?
4. What is the memory that you are having in your executors, and drivers?
5. Can you show the list of transformations that you are running ?




Regards,
Gourav Sengupta


On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) 
mailto:yangji...@baidu.com>> wrote:
This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase the 
capacity of DirectByteBuffer size by configuring  `-XX:MaxDirectMemorySize` and 
this is a Java opts.

However, we'd better check the length of memory to be allocated,  because  
`-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by default.


发件人: Anil Dasari mailto:adas...@guidewire.com>>
日期: 2022年3月2日 星期三 09:45
收件人: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
主题: Spark Parquet write OOM

Hello everyone,

We are writing Spark Data frame to s3 in parquet and it is failing with below 
exception.

I wanted to try following to avoid OOM


  1.  increase the default sql shuffle partitions to reduce load on parquet 
writer tasks to avoid OOM and
  2.  Increase user memory (reduce memory fraction) to have more memory for 
other data structures assuming parquet writer uses user memory.

I am not sure if these fixes the OOM issue. So wanted to reach out community 
for any suggestions. Please let me know.

Exception:

org.apache.spark.SparkException: Task failed while writing rows.
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
 at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError
 at sun.misc.Unsafe.allocateMemory(Native Method)
 at 
java.nio.DirectByteBuffer.(http://DirectByteBuffer.java:127<http://DirectByteBuffer.java:127>)
 at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
 at 
org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
 at 
org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
 at 
org.apache.parquet.bytes.Cap

Spark Parquet write OOM

2022-03-01 Thread Anil Dasari
Hello everyone,

We are writing Spark Data frame to s3 in parquet and it is failing with below 
exception.

I wanted to try following to avoid OOM


  1.  increase the default sql shuffle partitions to reduce load on parquet 
writer tasks to avoid OOM and
  2.  Increase user memory (reduce memory fraction) to have more memory for 
other data structures assuming parquet writer uses user memory.

I am not sure if these fixes the OOM issue. So wanted to reach out community 
for any suggestions. Please let me know.

Exception:

org.apache.spark.SparkException: Task failed while writing rows.
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
 at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError
 at sun.misc.Unsafe.allocateMemory(Native Method)
 at java.nio.DirectByteBuffer.(DirectByteBuffer.java:127)
 at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
 at 
org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
 at 
org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
 at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
 at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
 at 
org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
 at 
org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
 at 
org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
 at 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
 at 
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
 at 
org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
 at 
org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
 at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
 at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
 at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
 at 
org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
 at 
org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
 at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
 ... 10 more
 Suppressed: java.io.IOException: The file being written is in an 
invalid state. Probably caused by an error thrown previously. Current state: 
BLOCK
 at 
org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
 at 
org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
 at 

Spark 3.0 plugins

2021-12-19 Thread Anil Dasari
Hello everyone,

I was going through Apache Spark Performance Monitoring in Spark 
3.0 talk and wanted to collect IO 
metrics for my spark application.
Couldn’t find Spark 3.0 built-in plugins for IO metrics like 
https://github.com/cerndb/SparkPlugins  in Spark 3 documentation. Does spark 3 
bundle have in-built IO metric plugins ? Thanks in advance.

Regards,
Anil



Re: Spark Pair RDD write to Hive

2021-09-06 Thread Anil Dasari
2nd try

From: Anil Dasari 
Date: Sunday, September 5, 2021 at 10:42 AM
To: "user@spark.apache.org" 
Subject: Spark Pair RDD write to Hive

Hello,

I have a use case where users of group id are persisted to hive table.

// pseudo code looks like below
usersRDD = sc.parallelize(..)
usersPairRDD = usersRDD.map(u => (u.groupId, u))
groupedUsers = usersPairRDD.groupByKey()

Can I save groupedUsers RDD into hive tables where table name is key of 
groupedUsers entry ?

I want to avoid below approach as it is not scalable solution where papalism is 
limited with driver cores –

groupIds = usersRDD.map(u => u.groupId).distinct.collect.toList

groupIds.par.map(id => {
  rdd = usersRDD.filter(u => u.groupId == id).cache
// create dataframe
// persist df to hive table using df.write.saveAsTable
)

Could you suggest better approach ? thanks in advance.

-
Anil


Spark Pair RDD write to Hive

2021-09-05 Thread Anil Dasari
Hello,

I have a use case where users of group id are persisted to hive table.

// pseudo code looks like below
usersRDD = sc.parallelize(..)
usersPairRDD = usersRDD.map(u => (u.groupId, u))
groupedUsers = usersPairRDD.groupByKey()

Can I save groupedUsers RDD into hive tables where table name is key of 
groupedUsers entry ?

I want to avoid below approach as it is not scalable solution where papalism is 
limited with driver cores –

groupIds = usersRDD.map(u => u.groupId).distinct.collect.toList

groupIds.par.map(id => {
  rdd = usersRDD.filter(u => u.groupId == id).cache
// create dataframe
// persist df to hive table using df.write.saveAsTable
)

Could you suggest better approach ? thanks in advance.

-
Anil


Shutdown Spark application with failed state

2021-07-26 Thread Anil Dasari
Hello all,

I am using Spark 2.x streaming with kafka.
I noticed that spark streaming is processing subsequent micro-batches in case 
of failure as it takes a while to notify the driver about the error and 
interrupt streaming-executor thread. This is creating a problem as we are 
checkpointing the offsets internally.

To avoid the problem, we wanted to catch the exception in the RDD process and 
stop the spark streaming immediately.

streamRDD.foreachRDD { (rdd, microBatchTime) => {
try {
// business logic
}catch (Exception ex) {
  case ex: Exception =>
   // stop spark streaming
   streamingContext.stop(stopSparkContext = true, 
stopGracefully = false)
}
}
}

But the spark application state is set to Completed. So, the application is not 
restarted automatically by spark (with max attempts config).

I checked if there is a way to notify the error during the shutdown which sets 
the spark application status to Failed. ContextWaiter#notiftError is steaming 
package scoped and couldn’t find any other interfaces to propagate the 
error/exception to stop the process.

How to tell spark streaming to stop processing subsequent micro batches if a 
micro-batch throws an exception ? Is it possible to configure spark to create 
one micro batch RDD at a time ?
How to stop the spark streaming context with error ?

Any help would be appreciated. Thanks in advance.

Regards.



Shutdown Spark application with failed state

2021-07-26 Thread Anil Dasari
Hi Team,

I am using Spark 2.x streaming with kafka.
I noticed that spark streaming is processing subsequent micro-batches in case 
of failure as it takes a while to notify the driver about the error and 
interrupt streaming-executor thread. This is creating problem as we are 
checkpointing the offsets internally.

To avoid the problem, we wanted to catch the exception in in RDD process and 
stop the spark streaming immediately.

streamRDD.foreachRDD { (rdd, microBatchTime) => {
try {
// business logi
}catch (Exception ex) {
  case ex: Exception =>
   // stop spark streaming
   streamingContext.stop(stopSparkContext = true, 
stopGracefully = false)
}
  }
}

But the spark application state is set to Completed. So, application is not 
restarted automatically by spark (with max attempts config).

I checked if there is a way to notify the error during the shutdown which sets 
the spark application status to Failed. ContextWaiter#notiftError is steaming 
package scoped and couldn’t find any other interfaces to propagate the 
error/exception to stop process.

How to tell spark streaming to stop processing subsequent micro batches if a 
micro-batch throws an exception ? Is it possible to configure spark to create 
one micro batch RDD at a time ?
How to stop the spark streaming context with error ?

Any help would be appreciated. Thanks in advance.

Regards.