[jira] [Commented] (SPARK-48418) Spark structured streaming: Add microbatch timestamp to foreachBatch method

2024-05-25 Thread Anil Dasari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849465#comment-17849465
 ] 

Anil Dasari commented on SPARK-48418:
-

Using the current time as the batch time in foreachBatch might not work because 
this information is not available in the query progress and cannot be mapped.

> Spark structured streaming: Add microbatch timestamp to foreachBatch method
> ---
>
> Key: SPARK-48418
> URL: https://issues.apache.org/jira/browse/SPARK-48418
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.5.1
>Reporter: Anil Dasari
>Priority: Major
>
> We are on Spark 3.x and using Spark dstream + kafka and planning to use 
> structured streaming + Kafka.
> Differences in Dstream microbatch and structured streaming microbatch 
> metadata is making migration difficult. 
> Dstream#foreachRDD gives both microbatch RDD and start timestamp (in long). 
> However, structured streaming Dataset#foreachBatch returns only microbatch 
> dataset and batchID where BatchID is a numeric number. 
> micorbatch start time used across our data pipelines and final result. 
> Could you add microbatch start timestamp to  Dataset#foreachBatch method?
> Pseudo code :
>  
> {code:java}
> val inputStream = sparkSession.readStream.format("rate").load
> inputStream
>   .writeStream
>   .trigger(Trigger.ProcessingTime(10 * 1000))
>   .foreachBatch {
> (ds: Dataset[Row], batchId: Long, batchTime: Long) => // batchTime is 
> microbatch triggered/start timestamp
>   
>  // application logic.
>   }
>   .start()
>   .awaitTermination() {code}
>  
> Implementation approach when batchTime is trigger executor executed time:
> ( `currentTriggerStartTimestamp` can be used as well as batch time. Trigger 
> executor time is source of microbatch and also can be easily added to query 
> processor event as well)
> 1. Add trigger time to 
> [TriggerExecutor|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L31]
>  
> {code:java}
> trait TriggerExecutor {
>   // added batchTime (Long) argument 
>   def execute(batchRunner: (MicroBatchExecutionContext, Long) => Boolean): 
> Unit 
>   ... // (other methods)
> }{code}
> 2. Update ProcessingTimeExecutor and other executors to pass trigger time.
> {code:java}
> override def execute(triggerHandler: (MicroBatchExecutionContext, Long) => 
> Boolean): Unit = {
>   while (true) {
> val triggerTimeMs = clock.getTimeMillis()
> val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
> 
> // pass triggerTimeMs to runOneBatch which invokes triggerHandler and is 
> used in MicroBatchExecution#runActivatedStream method.
>     val terminated = !runOneBatch(triggerHandler, triggerTimeMs)
> 
>...
>   }
> } {code}
> 3. Add argument executionTime (long) argument to 
> MicroBatchExecution#excuteOneBatch method 
> [here|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L330]
> 4. Pass execution time in 
> [runBatch|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L380C11-L380C19]
>  and 
> [here|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L849]
> 5. Finally add following following method `foreachBatch` in 
> `DataStreamWriter` and update existing `foreachBatch` methods for new 
> argument.  and also add it to query processor event. 
> {code:java}
> def foreachBatch(function: (Dataset[T], Long, Long) => Unit): 
> DataStreamWriter[T] = {
>   this.source = SOURCE_NAME_FOREACH_BATCH
>   if (function == null) throw new IllegalArgumentException("foreachBatch 
> function cannot be null")
>   this.foreachBatchWriter = function
>   this
> }{code}
> Let me know your thoughts. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48418) Spark structured streaming: Add microbatch timestamp to foreachBatch method

2024-05-24 Thread Anil Dasari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anil Dasari updated SPARK-48418:

Description: 
We are on Spark 3.x and using Spark dstream + kafka and planning to use 
structured streaming + Kafka.

Differences in Dstream microbatch and structured streaming microbatch metadata 
is making migration difficult. 

Dstream#foreachRDD gives both microbatch RDD and start timestamp (in long). 
However, structured streaming Dataset#foreachBatch returns only microbatch 
dataset and batchID where BatchID is a numeric number. 

micorbatch start time used across our data pipelines and final result. 

Could you add microbatch start timestamp to  Dataset#foreachBatch method?

Pseudo code :

 
{code:java}
val inputStream = sparkSession.readStream.format("rate").load

inputStream
  .writeStream
  .trigger(Trigger.ProcessingTime(10 * 1000))
  .foreachBatch {
(ds: Dataset[Row], batchId: Long, batchTime: Long) => // batchTime is 
microbatch triggered/start timestamp
  
 // application logic.
  }
  .start()
  .awaitTermination() {code}
 

Implementation approach when batchTime is trigger executor executed time:

( `currentTriggerStartTimestamp` can be used as well as batch time. Trigger 
executor time is source of microbatch and also can be easily added to query 
processor event as well)

1. Add trigger time to 
[TriggerExecutor|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L31]
 
{code:java}
trait TriggerExecutor {
  // added batchTime (Long) argument 
  def execute(batchRunner: (MicroBatchExecutionContext, Long) => Boolean): Unit 

  ... // (other methods)
}{code}
2. Update ProcessingTimeExecutor and other executors to pass trigger time.
{code:java}
override def execute(triggerHandler: (MicroBatchExecutionContext, Long) => 
Boolean): Unit = {
  while (true) {
val triggerTimeMs = clock.getTimeMillis()
val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)

// pass triggerTimeMs to runOneBatch which invokes triggerHandler and is 
used in MicroBatchExecution#runActivatedStream method.
    val terminated = !runOneBatch(triggerHandler, triggerTimeMs)

   ...
  }
} {code}
3. Add argument executionTime (long) argument to 
MicroBatchExecution#excuteOneBatch method 
[here|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L330]

4. Pass execution time in 
[runBatch|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L380C11-L380C19]
 and 
[here|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L849]

5. Finally add following following method `foreachBatch` in `DataStreamWriter` 
and update existing `foreachBatch` methods for new argument.  and also add it 
to query processor event. 
{code:java}
def foreachBatch(function: (Dataset[T], Long, Long) => Unit): 
DataStreamWriter[T] = {
  this.source = SOURCE_NAME_FOREACH_BATCH
  if (function == null) throw new IllegalArgumentException("foreachBatch 
function cannot be null")
  this.foreachBatchWriter = function
  this
}{code}
Let me know your thoughts. 

  was:
We are on Spark 3.x and using Spark dstream + kafka and planning to use 
structured streaming + Kafka.

Differences in Dstream microbatch and structured streaming microbatch metadata 
is making migration difficult. 

Dstream#foreachRDD gives both microbatch RDD and start timestamp (in long). 
However, structured streaming Dataset#foreachBatch returns only microbatch 
dataset and batchID where BatchID is a numeric number. 

micorbatch start time used across our data pipelines and final result. 

Could you add microbatch start timestamp to  Dataset#foreachBatch method?

Pseudo code :

 
{code:java}
val inputStream = sparkSession.readStream.format("rate").load

inputStream
  .writeStream
  .trigger(Trigger.ProcessingTime(10 * 1000))
  .foreachBatch {
(ds: Dataset[Row], batchId: Long, batchTime: Long) => // batchTime is 
microbatch triggered/start timestamp
  
 // application logic.
  }
  .start()
  .awaitTermination() {code}
 

Implementation approach when batchTime is trigger executor executed time:

( `currentTriggerStartTimestamp` can be used as well as batch time. Trigger 
executor time is source of microbatch and also can be easily added to query 
processor event as well)
 # Add trigger time to 
[TriggerExecutor|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L31]
 
{code:java}
trait TriggerExecutor {
  // added batchTime (Long) argument 
  def execute(batchRunner: (MicroBatchExecutionContext, Long) => Boolean): Unit 

  ... // (other methods)
}{code}

 # 

[jira] [Created] (SPARK-48418) Spark structured streaming: Add microbatch timestamp to foreachBatch method

2024-05-24 Thread Anil Dasari (Jira)
Anil Dasari created SPARK-48418:
---

 Summary: Spark structured streaming: Add microbatch timestamp to 
foreachBatch method
 Key: SPARK-48418
 URL: https://issues.apache.org/jira/browse/SPARK-48418
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 3.5.1
Reporter: Anil Dasari


We are on Spark 3.x and using Spark dstream + kafka and planning to use 
structured streaming + Kafka.

Differences in Dstream microbatch and structured streaming microbatch metadata 
is making migration difficult. 

Dstream#foreachRDD gives both microbatch RDD and start timestamp (in long). 
However, structured streaming Dataset#foreachBatch returns only microbatch 
dataset and batchID where BatchID is a numeric number. 

micorbatch start time used across our data pipelines and final result. 

Could you add microbatch start timestamp to  Dataset#foreachBatch method?

Pseudo code :

 
{code:java}
val inputStream = sparkSession.readStream.format("rate").load

inputStream
  .writeStream
  .trigger(Trigger.ProcessingTime(10 * 1000))
  .foreachBatch {
(ds: Dataset[Row], batchId: Long, batchTime: Long) => // batchTime is 
microbatch triggered/start timestamp
  
 // application logic.
  }
  .start()
  .awaitTermination() {code}
 

Implementation approach when batchTime is trigger executor executed time:

( `currentTriggerStartTimestamp` can be used as well as batch time. Trigger 
executor time is source of microbatch and also can be easily added to query 
processor event as well)
 # Add trigger time to 
[TriggerExecutor|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L31]
 
{code:java}
trait TriggerExecutor {
  // added batchTime (Long) argument 
  def execute(batchRunner: (MicroBatchExecutionContext, Long) => Boolean): Unit 

  ... // (other methods)
}{code}

 # Update ProcessingTimeExecutor and other executors to pass trigger time.
{code:java}
override def execute(triggerHandler: (MicroBatchExecutionContext, Long) => 
Boolean): Unit = {
  while (true) {
val triggerTimeMs = clock.getTimeMillis()
val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)

// pass triggerTimeMs to runOneBatch which invokes triggerHandler and is 
used in MicroBatchExecution#runActivatedStream method.
    val terminated = !runOneBatch(triggerHandler, triggerTimeMs)

   ...
  }
} {code}
 

 # Add argument executionTime (long) argument to 
MicroBatchExecution#excuteOneBatch method 
[here|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L330]

 #  

Pass execution time in 
[runBatch|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L380C11-L380C19]
 and 
[here|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L849]

 # Finally add following following method `foreachBatch` in `DataStreamWriter` 
and update existing `foreachBatch` methods for new argument.  and also add it 
to query processor event. 
{code:java}
def foreachBatch(function: (Dataset[T], Long, Long) => Unit): 
DataStreamWriter[T] = {
  this.source = SOURCE_NAME_FOREACH_BATCH
  if (function == null) throw new IllegalArgumentException("foreachBatch 
function cannot be null")
  this.foreachBatchWriter = function
  this
}{code}

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44224) Table drop with Purge statement is not deleting the _temporary folder

2023-06-28 Thread Anil Dasari (Jira)
Anil Dasari created SPARK-44224:
---

 Summary: Table drop with Purge statement is not deleting the 
_temporary folder
 Key: SPARK-44224
 URL: https://issues.apache.org/jira/browse/SPARK-44224
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.1
Reporter: Anil Dasari


When saveAsTable is failed for some reason and `_temporary` folder created is 
not deleted with Drop table with purge statement. 

On high level , out data process look like below. 
 # Read data from external store using load 
 # Drop the table with purge just in case table exist already
 # Write Dataframe to hive using 
dataframe.write.saveMode(Overwrite).saveAsTable("my_table")

When Step 3 is failed for some reason and job is restarted because of yarm 
maxAttempts, Step is not deleting the _temporary folder created by Step 3 that 
is causing job to fail with below exception

org.apache.spark.sql.AnalysisException: Can not create the managed 
table("my_table"). The associated location('') already exists.

This is never been a case in Spark2 because of 
"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation" config which 
is removed in Spark 3.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-44083) Spark streaming: Add max pending microbatches conf to skip scheduling new mircobatch

2023-06-25 Thread Anil Dasari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anil Dasari closed SPARK-44083.
---

> Spark streaming: Add max pending microbatches conf to skip scheduling new 
> mircobatch
> 
>
> Key: SPARK-44083
> URL: https://issues.apache.org/jira/browse/SPARK-44083
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 3.4.0
>Reporter: Anil Dasari
>Priority: Major
>
> In the case of uneven incoming rates and high scheduling delays, streaming 
> will continue to add microbatches to the eventloop and submit the job to the 
> job thread executor. Consequently, pending microbatches hold fewer offset 
> ranges in Spark streaming Kafka if the kafka lag is less than the configured 
> maximum per partition. 
> We rely on the third-party service to add additional metadata to incoming 
> records, and its response times remain constant regardless of microbatch 
> size. So, Small microbatches can increase latencies further. An RDD's 
> metadata is fetched during the transform phase in our case for various 
> reasons, which is executed when micorbatch is scheduled. Our RDD transform on 
> high level :
> {code:java}
> val dstreams = ...
> dstreams.transform(rdd =>
>  {   
>   val uniqueItems = rdd.map(..).distinct.collect
>   val metadata = getMedatada(uniqueItems)
>   val rddWithMedatadata = rdd.map(...) // adds metadata  
>
>   rddWithMedatadata
>  })
> {code}
>  
> Scheduling many small microbatches can be avoided by skipping new jobs when 
> there are sufficient pending jobs in the queue. 
> Proposed changes in _JobExecutor.scala_ on high level:
> {code:java}
> val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", 
> -1)
> private def processEvent(event: JobGeneratorEvent): Unit = {
>  logDebug("Got event " + event)
>  event match {
>   case GenerateJobs(time) =>
>       if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < 
> maxPendingJobs){ 
>             generateJobs(time)
>        }else { 
>          logWarning("Skipping JobGenerator at " + time)   // TODO: add 
> pending times in queue to log.   
>   }
>   
>   // other current cases
>   case ...
>    .
>  }
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44083) Spark streaming: Add max pending microbatches conf to skip scheduling new mircobatch

2023-06-25 Thread Anil Dasari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anil Dasari resolved SPARK-44083.
-
Resolution: Won't Do

> Spark streaming: Add max pending microbatches conf to skip scheduling new 
> mircobatch
> 
>
> Key: SPARK-44083
> URL: https://issues.apache.org/jira/browse/SPARK-44083
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 3.4.0
>Reporter: Anil Dasari
>Priority: Major
>
> In the case of uneven incoming rates and high scheduling delays, streaming 
> will continue to add microbatches to the eventloop and submit the job to the 
> job thread executor. Consequently, pending microbatches hold fewer offset 
> ranges in Spark streaming Kafka if the kafka lag is less than the configured 
> maximum per partition. 
> We rely on the third-party service to add additional metadata to incoming 
> records, and its response times remain constant regardless of microbatch 
> size. So, Small microbatches can increase latencies further. An RDD's 
> metadata is fetched during the transform phase in our case for various 
> reasons, which is executed when micorbatch is scheduled. Our RDD transform on 
> high level :
> {code:java}
> val dstreams = ...
> dstreams.transform(rdd =>
>  {   
>   val uniqueItems = rdd.map(..).distinct.collect
>   val metadata = getMedatada(uniqueItems)
>   val rddWithMedatadata = rdd.map(...) // adds metadata  
>
>   rddWithMedatadata
>  })
> {code}
>  
> Scheduling many small microbatches can be avoided by skipping new jobs when 
> there are sufficient pending jobs in the queue. 
> Proposed changes in _JobExecutor.scala_ on high level:
> {code:java}
> val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", 
> -1)
> private def processEvent(event: JobGeneratorEvent): Unit = {
>  logDebug("Got event " + event)
>  event match {
>   case GenerateJobs(time) =>
>       if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < 
> maxPendingJobs){ 
>             generateJobs(time)
>        }else { 
>          logWarning("Skipping JobGenerator at " + time)   // TODO: add 
> pending times in queue to log.   
>   }
>   
>   // other current cases
>   case ...
>    .
>  }
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-44083) Spark streaming: Add max pending microbatches conf to skip scheduling new mircobatch

2023-06-25 Thread Anil Dasari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17736937#comment-17736937
 ] 

Anil Dasari commented on SPARK-44083:
-

[~kabhwan] Thanks for the details. i will close the Jira for now and will 
reopen if i see similar behavior in structured streaming. 

> Spark streaming: Add max pending microbatches conf to skip scheduling new 
> mircobatch
> 
>
> Key: SPARK-44083
> URL: https://issues.apache.org/jira/browse/SPARK-44083
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 3.4.0
>Reporter: Anil Dasari
>Priority: Major
>
> In the case of uneven incoming rates and high scheduling delays, streaming 
> will continue to add microbatches to the eventloop and submit the job to the 
> job thread executor. Consequently, pending microbatches hold fewer offset 
> ranges in Spark streaming Kafka if the kafka lag is less than the configured 
> maximum per partition. 
> We rely on the third-party service to add additional metadata to incoming 
> records, and its response times remain constant regardless of microbatch 
> size. So, Small microbatches can increase latencies further. An RDD's 
> metadata is fetched during the transform phase in our case for various 
> reasons, which is executed when micorbatch is scheduled. Our RDD transform on 
> high level :
> {code:java}
> val dstreams = ...
> dstreams.transform(rdd =>
>  {   
>   val uniqueItems = rdd.map(..).distinct.collect
>   val metadata = getMedatada(uniqueItems)
>   val rddWithMedatadata = rdd.map(...) // adds metadata  
>
>   rddWithMedatadata
>  })
> {code}
>  
> Scheduling many small microbatches can be avoided by skipping new jobs when 
> there are sufficient pending jobs in the queue. 
> Proposed changes in _JobExecutor.scala_ on high level:
> {code:java}
> val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", 
> -1)
> private def processEvent(event: JobGeneratorEvent): Unit = {
>  logDebug("Got event " + event)
>  event match {
>   case GenerateJobs(time) =>
>       if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < 
> maxPendingJobs){ 
>             generateJobs(time)
>        }else { 
>          logWarning("Skipping JobGenerator at " + time)   // TODO: add 
> pending times in queue to log.   
>   }
>   
>   // other current cases
>   case ...
>    .
>  }
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44083) Spark streaming: Add max pending microbatches conf to skip scheduling new mircobatch

2023-06-16 Thread Anil Dasari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anil Dasari updated SPARK-44083:

Description: 
In the case of uneven incoming rates and high scheduling delays, streaming will 
continue to add microbatches to the eventloop and submit the job to the job 
thread executor. Consequently, pending microbatches hold fewer offset ranges in 
Spark streaming Kafka if the kafka lag is less than the configured maximum per 
partition. 

We rely on the third-party service to add additional metadata to incoming 
records, and its response times remain constant regardless of microbatch size. 
So, Small microbatches can increase latencies further. An RDD's metadata is 
fetched during the transform phase in our case for various reasons, which is 
executed when micorbatch is scheduled. Our RDD transform on high level :
{code:java}
val dstreams = ...
dstreams.transform(rdd =>
 {   
  val uniqueItems = rdd.map(..).distinct.collect
  val metadata = getMedatada(uniqueItems)
  val rddWithMedatadata = rdd.map(...) // adds metadata  
   
  rddWithMedatadata
 })
{code}
 

Scheduling many small microbatches can be avoided by skipping new jobs when 
there are sufficient pending jobs in the queue. 

Proposed changes in _JobExecutor.scala_ on high level:
{code:java}
val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", -1)

private def processEvent(event: JobGeneratorEvent): Unit = {
 logDebug("Got event " + event)
 event match {
  case GenerateJobs(time) =>
      if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < 
maxPendingJobs){ 
            generateJobs(time)
       }else { 
         logWarning("Skipping JobGenerator at " + time)   // TODO: add pending 
times in queue to log.   
  }
  
  // other current cases
  case ...
   .
 }
} {code}
 

  was:
In the case of uneven incoming rates and high scheduling delays, streaming will 
continue to add microbatches to the eventloop and submit the job to the job 
thread executor. Consequently, pending microbatches hold fewer offset ranges in 
Spark streaming Kafka if the kafka lag is less than the configured maximum per 
partition. 

We rely on the third-party service to add additional metadata to incoming 
records, and its response times remain constant regardless of microbatch size. 
An RDD's metadata is fetched during the transform phase in our case if various 
reasons, which is executed when micorbatch is scheduled. Our RDD transform on 
high level :
{code:java}
val dstreams = ...
dstreams.transform(rdd =>
 {   
  val uniqueItems = rdd.map(..).distinct.collect
  val metadata = getMedatada(uniqueItems)
  val rddWithMedatadata = rdd.map(...) // adds metadata  
   
  rddWithMedatadata
 })
{code}
 

Scheduling small microbatches can be avoided by skipping new jobs when there 
are sufficient pending jobs in the queue. 

 

Proposed changes in _JobExecutor.scala_ on high level:
{code:java}
val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", -1)

private def processEvent(event: JobGeneratorEvent): Unit = {
 logDebug("Got event " + event)
 event match {
  case GenerateJobs(time) =>
      if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < 
maxPendingJobs){ 
            generateJobs(time)
       }else { 
         logWarning("Skipping JobGenerator at " + time)   // TODO: add pending 
times in queue to log.   
  }
  
  // other current cases
  case ...
   .
 }
} {code}
 


> Spark streaming: Add max pending microbatches conf to skip scheduling new 
> mircobatch
> 
>
> Key: SPARK-44083
> URL: https://issues.apache.org/jira/browse/SPARK-44083
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.4.0
>Reporter: Anil Dasari
>Priority: Major
>
> In the case of uneven incoming rates and high scheduling delays, streaming 
> will continue to add microbatches to the eventloop and submit the job to the 
> job thread executor. Consequently, pending microbatches hold fewer offset 
> ranges in Spark streaming Kafka if the kafka lag is less than the configured 
> maximum per partition. 
> We rely on the third-party service to add additional metadata to incoming 
> records, and its response times remain constant regardless of microbatch 
> size. So, Small microbatches can increase latencies further. An RDD's 
> metadata is fetched during the transform phase in our case for various 
> reasons, which is executed when micorbatch is scheduled. Our RDD transform on 
> high level :
> {code:java}
> val dstreams = ...
> dstreams.transform(rdd =>
>  {   
>   val uniqueItems = rdd.map(..).distinct.collect
>   val metadata = getMedatada(uniqueItems)
>   val rddWithMedatadata = rdd.map(...) // adds 

[jira] [Updated] (SPARK-44083) Spark streaming: Add max pending microbatches conf to skip scheduling new mircobatch

2023-06-16 Thread Anil Dasari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anil Dasari updated SPARK-44083:

Issue Type: Improvement  (was: New Feature)

> Spark streaming: Add max pending microbatches conf to skip scheduling new 
> mircobatch
> 
>
> Key: SPARK-44083
> URL: https://issues.apache.org/jira/browse/SPARK-44083
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.4.0
>Reporter: Anil Dasari
>Priority: Major
>
> In the case of uneven incoming rates and high scheduling delays, streaming 
> will continue to add microbatches to the eventloop and submit the job to the 
> job thread executor. Consequently, pending microbatches hold fewer offset 
> ranges in Spark streaming Kafka if the kafka lag is less than the configured 
> maximum per partition. 
> We rely on the third-party service to add additional metadata to incoming 
> records, and its response times remain constant regardless of microbatch 
> size. So, Small microbatches can increase latencies further. An RDD's 
> metadata is fetched during the transform phase in our case for various 
> reasons, which is executed when micorbatch is scheduled. Our RDD transform on 
> high level :
> {code:java}
> val dstreams = ...
> dstreams.transform(rdd =>
>  {   
>   val uniqueItems = rdd.map(..).distinct.collect
>   val metadata = getMedatada(uniqueItems)
>   val rddWithMedatadata = rdd.map(...) // adds metadata  
>
>   rddWithMedatadata
>  })
> {code}
>  
> Scheduling many small microbatches can be avoided by skipping new jobs when 
> there are sufficient pending jobs in the queue. 
> Proposed changes in _JobExecutor.scala_ on high level:
> {code:java}
> val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", 
> -1)
> private def processEvent(event: JobGeneratorEvent): Unit = {
>  logDebug("Got event " + event)
>  event match {
>   case GenerateJobs(time) =>
>       if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < 
> maxPendingJobs){ 
>             generateJobs(time)
>        }else { 
>          logWarning("Skipping JobGenerator at " + time)   // TODO: add 
> pending times in queue to log.   
>   }
>   
>   // other current cases
>   case ...
>    .
>  }
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44083) Spark streaming: Add max pending microbatches conf to skip scheduling new mircobatch

2023-06-16 Thread Anil Dasari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anil Dasari updated SPARK-44083:

Description: 
In the case of uneven incoming rates and high scheduling delays, streaming will 
continue to add microbatches to the eventloop and submit the job to the job 
thread executor. Consequently, pending microbatches hold fewer offset ranges in 
Spark streaming Kafka if the kafka lag is less than the configured maximum per 
partition. 

We rely on the third-party service to add additional metadata to incoming 
records, and its response times remain constant regardless of microbatch size. 
An RDD's metadata is fetched during the transform phase in our case if various 
reasons, which is executed when micorbatch is scheduled. Our RDD transform on 
high level :
{code:java}
val dstreams = ...
dstreams.transform(rdd =>
 {   
  val uniqueItems = rdd.map(..).distinct.collect
  val metadata = getMedatada(uniqueItems)
  val rddWithMedatadata = rdd.map(...) // adds metadata  
   
  rddWithMedatadata
 })
{code}
 

Scheduling small microbatches can be avoided by skipping new jobs when there 
are sufficient pending jobs in the queue. 

 

Proposed changes in _JobExecutor.scala_ on high level:
{code:java}
val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", -1)

private def processEvent(event: JobGeneratorEvent): Unit = {
 logDebug("Got event " + event)
 event match {
  case GenerateJobs(time) =>
      if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < 
maxPendingJobs){ 
            generateJobs(time)
       }else { 
         logWarning("Skipping JobGenerator at " + time)   // TODO: add pending 
times in queue to log.   
  }
  
  // other current cases
  case ...
   .
 }
} {code}
 

  was:
In the case of uneven incoming rates and high scheduling delays, streaming will 
continue to add microbatches to the eventloop and submit the job to the job 
thread executor. Consequently, pending microbatches hold fewer offset ranges in 
Spark streaming Kafka if the kafka lag is less than the configured maximum per 
partition. 

We rely on the third-party service to add additional metadata to incoming 
records, and its response times remain constant regardless of microbatch size. 
An RDD's metadata is fetched during the transform phase in our case if various 
reasons, which is executed when micorbatch is scheduled. Our RDD transform on 
high level :
{code:java}
val dstreams = ...
dstreams.transform(rdd =>
 {   
  val uniqueItems = rdd.map(..).distinct.collect
  val metadata = getMedatada(uniqueItems)
  val rddWithMedatadata = rdd.map(...) // adds metadata  
   
  rddWithMedatadata
 })
{code}
 

Scheduling small microbatches can be avoided by skipping new jobs when there 
are sufficient pending jobs in the queue. 

 

Proposed changes in _JobExecutor.scala_ on high level:
{code:java}
val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", -1)

private def processEvent(event: JobGeneratorEvent): Unit = {
 logDebug("Got event " + event)
 event match {
  case GenerateJobs(time) =>
      if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < 
maxPendingJobs){ 
            generateJobs(time)
       }else { 
         logWarning("Skipping JobGenerator at " + time)   // adding pending 
times in queue.   
  }
  
  // other current cases
  case ...
   .
 }
} {code}
 


> Spark streaming: Add max pending microbatches conf to skip scheduling new 
> mircobatch
> 
>
> Key: SPARK-44083
> URL: https://issues.apache.org/jira/browse/SPARK-44083
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.4.0
>Reporter: Anil Dasari
>Priority: Major
>
> In the case of uneven incoming rates and high scheduling delays, streaming 
> will continue to add microbatches to the eventloop and submit the job to the 
> job thread executor. Consequently, pending microbatches hold fewer offset 
> ranges in Spark streaming Kafka if the kafka lag is less than the configured 
> maximum per partition. 
> We rely on the third-party service to add additional metadata to incoming 
> records, and its response times remain constant regardless of microbatch 
> size. An RDD's metadata is fetched during the transform phase in our case if 
> various reasons, which is executed when micorbatch is scheduled. Our RDD 
> transform on high level :
> {code:java}
> val dstreams = ...
> dstreams.transform(rdd =>
>  {   
>   val uniqueItems = rdd.map(..).distinct.collect
>   val metadata = getMedatada(uniqueItems)
>   val rddWithMedatadata = rdd.map(...) // adds metadata  
>
>   rddWithMedatadata
>  })
> {code}
>  
> Scheduling small microbatches can be avoided by skipping new jobs when 

[jira] [Updated] (SPARK-44083) Spark streaming: Add max pending microbatches conf to avoid scheduling new mircobatch

2023-06-16 Thread Anil Dasari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anil Dasari updated SPARK-44083:

Summary: Spark streaming: Add max pending microbatches conf to avoid 
scheduling new mircobatch  (was: Spark streaming: Add max pending microbatches 
conf to avoid new pending mircobatch)

> Spark streaming: Add max pending microbatches conf to avoid scheduling new 
> mircobatch
> -
>
> Key: SPARK-44083
> URL: https://issues.apache.org/jira/browse/SPARK-44083
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.4.0
>Reporter: Anil Dasari
>Priority: Major
>
> In the case of uneven incoming rates and high scheduling delays, streaming 
> will continue to add microbatches to the eventloop and submit the job to the 
> job thread executor. Consequently, pending microbatches hold fewer offset 
> ranges in Spark streaming Kafka if the kafka lag is less than the configured 
> maximum per partition. 
> We rely on the third-party service to add additional metadata to incoming 
> records, and its response times remain constant regardless of microbatch 
> size. An RDD's metadata is fetched during the transform phase in our case if 
> various reasons, which is executed when micorbatch is scheduled. Our RDD 
> transform on high level :
> {code:java}
> val dstreams = ...
> dstreams.transform(rdd =>
>  {   
>   val uniqueItems = rdd.map(..).distinct.collect
>   val metadata = getMedatada(uniqueItems)
>   val rddWithMedatadata = rdd.map(...) // adds metadata  
>
>   rddWithMedatadata
>  })
> {code}
>  
> Scheduling small microbatches can be avoided by skipping new jobs when there 
> are sufficient pending jobs in the queue. 
>  
> Proposed changes in _JobExecutor.scala_ on high level:
> {code:java}
> val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", 
> -1)
> private def processEvent(event: JobGeneratorEvent): Unit = {
>  logDebug("Got event " + event)
>  event match {
>   case GenerateJobs(time) =>
>       if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < 
> maxPendingJobs){ 
>             generateJobs(time)
>        }else { 
>          logWarning("Skipping JobGenerator at " + time)   // adding pending 
> times in queue.   
>   }
>   
>   // other current cases
>   case ...
>    .
>  }
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44083) Spark streaming: Add max pending microbatches conf to skip scheduling new mircobatch

2023-06-16 Thread Anil Dasari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anil Dasari updated SPARK-44083:

Summary: Spark streaming: Add max pending microbatches conf to skip 
scheduling new mircobatch  (was: Spark streaming: Add max pending microbatches 
conf to avoid scheduling new mircobatch)

> Spark streaming: Add max pending microbatches conf to skip scheduling new 
> mircobatch
> 
>
> Key: SPARK-44083
> URL: https://issues.apache.org/jira/browse/SPARK-44083
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.4.0
>Reporter: Anil Dasari
>Priority: Major
>
> In the case of uneven incoming rates and high scheduling delays, streaming 
> will continue to add microbatches to the eventloop and submit the job to the 
> job thread executor. Consequently, pending microbatches hold fewer offset 
> ranges in Spark streaming Kafka if the kafka lag is less than the configured 
> maximum per partition. 
> We rely on the third-party service to add additional metadata to incoming 
> records, and its response times remain constant regardless of microbatch 
> size. An RDD's metadata is fetched during the transform phase in our case if 
> various reasons, which is executed when micorbatch is scheduled. Our RDD 
> transform on high level :
> {code:java}
> val dstreams = ...
> dstreams.transform(rdd =>
>  {   
>   val uniqueItems = rdd.map(..).distinct.collect
>   val metadata = getMedatada(uniqueItems)
>   val rddWithMedatadata = rdd.map(...) // adds metadata  
>
>   rddWithMedatadata
>  })
> {code}
>  
> Scheduling small microbatches can be avoided by skipping new jobs when there 
> are sufficient pending jobs in the queue. 
>  
> Proposed changes in _JobExecutor.scala_ on high level:
> {code:java}
> val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", 
> -1)
> private def processEvent(event: JobGeneratorEvent): Unit = {
>  logDebug("Got event " + event)
>  event match {
>   case GenerateJobs(time) =>
>       if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < 
> maxPendingJobs){ 
>             generateJobs(time)
>        }else { 
>          logWarning("Skipping JobGenerator at " + time)   // adding pending 
> times in queue.   
>   }
>   
>   // other current cases
>   case ...
>    .
>  }
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44083) Spark streaming: Add max pending microbatches conf to avoid new pending mircobatch

2023-06-16 Thread Anil Dasari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anil Dasari updated SPARK-44083:

Description: 
In the case of uneven incoming rates and high scheduling delays, streaming will 
continue to add microbatches to the eventloop and submit the job to the job 
thread executor. Consequently, pending microbatches hold fewer offset ranges in 
Spark streaming Kafka if the kafka lag is less than the configured maximum per 
partition. 

We rely on the third-party service to add additional metadata to incoming 
records, and its response times remain constant regardless of microbatch size. 
An RDD's metadata is fetched during the transform phase in our case if various 
reasons, which is executed when micorbatch is scheduled. Our RDD transform on 
high level :
{code:java}
val dstreams = ...
dstreams.transform(rdd =>
 {   
  val uniqueItems = rdd.map(..).distinct.collect
  val metadata = getMedatada(uniqueItems)
  val rddWithMedatadata = rdd.map(...) // adds metadata  
   
  rddWithMedatadata
 })
{code}
 

Scheduling small microbatches can be avoided by skipping new jobs when there 
are sufficient pending jobs in the queue. 

 

Proposed changes in _JobExecutor.scala_ on high level:
{code:java}
val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", -1)

private def processEvent(event: JobGeneratorEvent): Unit = {
 logDebug("Got event " + event)
 event match {
  case GenerateJobs(time) =>
      if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < 
maxPendingJobs){ 
            generateJobs(time)
       }else { 
         logWarning("Skipping JobGenerator at " + time)   // adding pending 
times in queue.   
  }
  
  // other current cases
  case ...
   .
 }
} {code}
 

  was:
In the case of uneven incoming rates and high scheduling delays, streaming will 
continue to add microbatches to the eventloop and submit the job to the job 
thread executor. Consequently, pending microbatches hold fewer offset ranges in 
Spark streaming Kafka if the kafka lag is less than the configured maximum per 
partition. 

We rely on the third-party service to add additional metadata to incoming 
records, and its response times remain constant regardless of microbatch size. 
An RDD's metadata is fetched during the transform phase in our case if various 
reasons, which is executed when micorbatch is scheduled. Our RDD transform on 
high level :

```

va dstreams = ...

dstreams.transform(rdd => {

  val uniqueItems = rdd.map(..).distinct.collect

  val metadata = getMedatada(uniqueItems)

 val rddWithMedatadata = rdd.map(...). // adds metadata

  rddWithMedatadata

})

// other 

```

Scheduling small microbatches can be avoided by skipping new jobs when there 
are sufficient pending jobs in the queue. 

 

Proposed changes on high level:

```

val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", -1)

private def processEvent(event: JobGeneratorEvent): Unit = {
 logDebug("Got event " + event)
 event match {
  case GenerateJobs(time) =>
      if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < 
maxPendingJobs) {
            generateJobs(time)
      } else {
         logWarning("*** Skipping JobGenerator at  " + time)
      }

 // other current cases
  case ...

   .
 }
}

```

 

 


> Spark streaming: Add max pending microbatches conf to avoid new pending 
> mircobatch
> --
>
> Key: SPARK-44083
> URL: https://issues.apache.org/jira/browse/SPARK-44083
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.4.0
>Reporter: Anil Dasari
>Priority: Major
>
> In the case of uneven incoming rates and high scheduling delays, streaming 
> will continue to add microbatches to the eventloop and submit the job to the 
> job thread executor. Consequently, pending microbatches hold fewer offset 
> ranges in Spark streaming Kafka if the kafka lag is less than the configured 
> maximum per partition. 
> We rely on the third-party service to add additional metadata to incoming 
> records, and its response times remain constant regardless of microbatch 
> size. An RDD's metadata is fetched during the transform phase in our case if 
> various reasons, which is executed when micorbatch is scheduled. Our RDD 
> transform on high level :
> {code:java}
> val dstreams = ...
> dstreams.transform(rdd =>
>  {   
>   val uniqueItems = rdd.map(..).distinct.collect
>   val metadata = getMedatada(uniqueItems)
>   val rddWithMedatadata = rdd.map(...) // adds metadata  
>
>   rddWithMedatadata
>  })
> {code}
>  
> Scheduling small microbatches can be avoided by skipping new jobs when there 
> are sufficient pending jobs in the queue. 
>  
> Proposed changes 

[jira] [Created] (SPARK-44083) Spark streaming: Add max pending microbatches conf to avoid new pending mircobatch

2023-06-16 Thread Anil Dasari (Jira)
Anil Dasari created SPARK-44083:
---

 Summary: Spark streaming: Add max pending microbatches conf to 
avoid new pending mircobatch
 Key: SPARK-44083
 URL: https://issues.apache.org/jira/browse/SPARK-44083
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 3.4.0
Reporter: Anil Dasari


In the case of uneven incoming rates and high scheduling delays, streaming will 
continue to add microbatches to the eventloop and submit the job to the job 
thread executor. Consequently, pending microbatches hold fewer offset ranges in 
Spark streaming Kafka if the kafka lag is less than the configured maximum per 
partition. 

We rely on the third-party service to add additional metadata to incoming 
records, and its response times remain constant regardless of microbatch size. 
An RDD's metadata is fetched during the transform phase in our case if various 
reasons, which is executed when micorbatch is scheduled. Our RDD transform on 
high level :

```

va dstreams = ...

dstreams.transform(rdd => {

  val uniqueItems = rdd.map(..).distinct.collect

  val metadata = getMedatada(uniqueItems)

 val rddWithMedatadata = rdd.map(...). // adds metadata

  rddWithMedatadata

})

// other 

```

Scheduling small microbatches can be avoided by skipping new jobs when there 
are sufficient pending jobs in the queue. 

 

Proposed changes on high level:

```

val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", -1)

private def processEvent(event: JobGeneratorEvent): Unit = {
 logDebug("Got event " + event)
 event match {
  case GenerateJobs(time) =>
      if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < 
maxPendingJobs) {
            generateJobs(time)
      } else {
         logWarning("*** Skipping JobGenerator at  " + time)
      }

 // other current cases
  case ...

   .
 }
}

```

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-40507) Spark creates an optional columns in hive table for fields that are not null

2022-10-06 Thread Anil Dasari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17613787#comment-17613787
 ] 

Anil Dasari edited comment on SPARK-40507 at 10/6/22 8:48 PM:
--

does anyone have thoughts on this ?


was (Author: JIRAUSER283879):
does anyone have an thoughts on this ?

> Spark creates an optional columns in hive table for fields that are not null
> 
>
> Key: SPARK-40507
> URL: https://issues.apache.org/jira/browse/SPARK-40507
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Anil Dasari
>Priority: Major
>
> Dataframe saveAsTable sets all columns as optional/nullable while creating 
> the table here  
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L531]
> (`outputColumns.toStructType.asNullable`)
> This makes source parquet schema and hive table schema doesn't match and is 
> problematic when large dataframe(s) process uses hive as temporary storage to 
> avoid the memory pressure. 
> Hive 3.x supports non null constraints on table columns. Please add support 
> for non null constraints on Spark sql hive table. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40507) Spark creates an optional columns in hive table for fields that are not null

2022-10-06 Thread Anil Dasari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17613787#comment-17613787
 ] 

Anil Dasari commented on SPARK-40507:
-

does anyone have an thoughts on this ?

> Spark creates an optional columns in hive table for fields that are not null
> 
>
> Key: SPARK-40507
> URL: https://issues.apache.org/jira/browse/SPARK-40507
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Anil Dasari
>Priority: Major
>
> Dataframe saveAsTable sets all columns as optional/nullable while creating 
> the table here  
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L531]
> (`outputColumns.toStructType.asNullable`)
> This makes source parquet schema and hive table schema doesn't match and is 
> problematic when large dataframe(s) process uses hive as temporary storage to 
> avoid the memory pressure. 
> Hive 3.x supports non null constraints on table columns. Please add support 
> for non null constraints on Spark sql hive table. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40507) Spark creates an optional columns in hive table for fields that are not null

2022-09-20 Thread Anil Dasari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anil Dasari updated SPARK-40507:

Description: 
Dataframe saveAsTable sets all columns as optional/nullable while creating the 
table here  

[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L531]

(`outputColumns.toStructType.asNullable`)

This makes source parquet schema and hive table schema doesn't match and is 
problematic when large dataframe(s) process uses hive as temporary storage to 
avoid the memory pressure. 

Hive 3.x supports non null constraints on table columns. Please add support for 
non null constraints on Spark sql hive table. 

  was:
Dataframe saveAsTable sets all columns as optional/nullable while creating the 
table here  

[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L531]

(`outputColumns.toStructType.asNullable`)

This makes source parquet schema and hive table schema doesn't match and is 
problematic when large dataframe(s) process uses hive as temporary storage to 
avoid the memory pressure. 

Hive 3.x supports non null constraints on table columns. Please add support non 
null constraints on Spark sql hive table. 


> Spark creates an optional columns in hive table for fields that are not null
> 
>
> Key: SPARK-40507
> URL: https://issues.apache.org/jira/browse/SPARK-40507
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Anil Dasari
>Priority: Major
>
> Dataframe saveAsTable sets all columns as optional/nullable while creating 
> the table here  
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L531]
> (`outputColumns.toStructType.asNullable`)
> This makes source parquet schema and hive table schema doesn't match and is 
> problematic when large dataframe(s) process uses hive as temporary storage to 
> avoid the memory pressure. 
> Hive 3.x supports non null constraints on table columns. Please add support 
> for non null constraints on Spark sql hive table. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40507) Spark creates an optional columns in hive table for fields that are not null

2022-09-20 Thread Anil Dasari (Jira)
Anil Dasari created SPARK-40507:
---

 Summary: Spark creates an optional columns in hive table for 
fields that are not null
 Key: SPARK-40507
 URL: https://issues.apache.org/jira/browse/SPARK-40507
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0
Reporter: Anil Dasari


Dataframe saveAsTable sets all columns as optional/nullable while creating the 
table here  

[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L531]

(`outputColumns.toStructType.asNullable`)

This makes source parquet schema and hive table schema doesn't match and is 
problematic when large dataframe(s) process uses hive as temporary storage to 
avoid the memory pressure. 

Hive 3.x supports non null constraints on table columns. Please add support non 
null constraints on Spark sql hive table. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org