[jira] [Commented] (SPARK-48418) Spark structured streaming: Add microbatch timestamp to foreachBatch method
[ https://issues.apache.org/jira/browse/SPARK-48418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849465#comment-17849465 ] Anil Dasari commented on SPARK-48418: - Using the current time as the batch time in foreachBatch might not work because this information is not available in the query progress and cannot be mapped. > Spark structured streaming: Add microbatch timestamp to foreachBatch method > --- > > Key: SPARK-48418 > URL: https://issues.apache.org/jira/browse/SPARK-48418 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.5.1 >Reporter: Anil Dasari >Priority: Major > > We are on Spark 3.x and using Spark dstream + kafka and planning to use > structured streaming + Kafka. > Differences in Dstream microbatch and structured streaming microbatch > metadata is making migration difficult. > Dstream#foreachRDD gives both microbatch RDD and start timestamp (in long). > However, structured streaming Dataset#foreachBatch returns only microbatch > dataset and batchID where BatchID is a numeric number. > micorbatch start time used across our data pipelines and final result. > Could you add microbatch start timestamp to Dataset#foreachBatch method? > Pseudo code : > > {code:java} > val inputStream = sparkSession.readStream.format("rate").load > inputStream > .writeStream > .trigger(Trigger.ProcessingTime(10 * 1000)) > .foreachBatch { > (ds: Dataset[Row], batchId: Long, batchTime: Long) => // batchTime is > microbatch triggered/start timestamp > > // application logic. > } > .start() > .awaitTermination() {code} > > Implementation approach when batchTime is trigger executor executed time: > ( `currentTriggerStartTimestamp` can be used as well as batch time. Trigger > executor time is source of microbatch and also can be easily added to query > processor event as well) > 1. Add trigger time to > [TriggerExecutor|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L31] > > {code:java} > trait TriggerExecutor { > // added batchTime (Long) argument > def execute(batchRunner: (MicroBatchExecutionContext, Long) => Boolean): > Unit > ... // (other methods) > }{code} > 2. Update ProcessingTimeExecutor and other executors to pass trigger time. > {code:java} > override def execute(triggerHandler: (MicroBatchExecutionContext, Long) => > Boolean): Unit = { > while (true) { > val triggerTimeMs = clock.getTimeMillis() > val nextTriggerTimeMs = nextBatchTime(triggerTimeMs) > > // pass triggerTimeMs to runOneBatch which invokes triggerHandler and is > used in MicroBatchExecution#runActivatedStream method. > val terminated = !runOneBatch(triggerHandler, triggerTimeMs) > >... > } > } {code} > 3. Add argument executionTime (long) argument to > MicroBatchExecution#excuteOneBatch method > [here|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L330] > 4. Pass execution time in > [runBatch|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L380C11-L380C19] > and > [here|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L849] > 5. Finally add following following method `foreachBatch` in > `DataStreamWriter` and update existing `foreachBatch` methods for new > argument. and also add it to query processor event. > {code:java} > def foreachBatch(function: (Dataset[T], Long, Long) => Unit): > DataStreamWriter[T] = { > this.source = SOURCE_NAME_FOREACH_BATCH > if (function == null) throw new IllegalArgumentException("foreachBatch > function cannot be null") > this.foreachBatchWriter = function > this > }{code} > Let me know your thoughts. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48418) Spark structured streaming: Add microbatch timestamp to foreachBatch method
[ https://issues.apache.org/jira/browse/SPARK-48418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anil Dasari updated SPARK-48418: Description: We are on Spark 3.x and using Spark dstream + kafka and planning to use structured streaming + Kafka. Differences in Dstream microbatch and structured streaming microbatch metadata is making migration difficult. Dstream#foreachRDD gives both microbatch RDD and start timestamp (in long). However, structured streaming Dataset#foreachBatch returns only microbatch dataset and batchID where BatchID is a numeric number. micorbatch start time used across our data pipelines and final result. Could you add microbatch start timestamp to Dataset#foreachBatch method? Pseudo code : {code:java} val inputStream = sparkSession.readStream.format("rate").load inputStream .writeStream .trigger(Trigger.ProcessingTime(10 * 1000)) .foreachBatch { (ds: Dataset[Row], batchId: Long, batchTime: Long) => // batchTime is microbatch triggered/start timestamp // application logic. } .start() .awaitTermination() {code} Implementation approach when batchTime is trigger executor executed time: ( `currentTriggerStartTimestamp` can be used as well as batch time. Trigger executor time is source of microbatch and also can be easily added to query processor event as well) 1. Add trigger time to [TriggerExecutor|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L31] {code:java} trait TriggerExecutor { // added batchTime (Long) argument def execute(batchRunner: (MicroBatchExecutionContext, Long) => Boolean): Unit ... // (other methods) }{code} 2. Update ProcessingTimeExecutor and other executors to pass trigger time. {code:java} override def execute(triggerHandler: (MicroBatchExecutionContext, Long) => Boolean): Unit = { while (true) { val triggerTimeMs = clock.getTimeMillis() val nextTriggerTimeMs = nextBatchTime(triggerTimeMs) // pass triggerTimeMs to runOneBatch which invokes triggerHandler and is used in MicroBatchExecution#runActivatedStream method. val terminated = !runOneBatch(triggerHandler, triggerTimeMs) ... } } {code} 3. Add argument executionTime (long) argument to MicroBatchExecution#excuteOneBatch method [here|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L330] 4. Pass execution time in [runBatch|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L380C11-L380C19] and [here|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L849] 5. Finally add following following method `foreachBatch` in `DataStreamWriter` and update existing `foreachBatch` methods for new argument. and also add it to query processor event. {code:java} def foreachBatch(function: (Dataset[T], Long, Long) => Unit): DataStreamWriter[T] = { this.source = SOURCE_NAME_FOREACH_BATCH if (function == null) throw new IllegalArgumentException("foreachBatch function cannot be null") this.foreachBatchWriter = function this }{code} Let me know your thoughts. was: We are on Spark 3.x and using Spark dstream + kafka and planning to use structured streaming + Kafka. Differences in Dstream microbatch and structured streaming microbatch metadata is making migration difficult. Dstream#foreachRDD gives both microbatch RDD and start timestamp (in long). However, structured streaming Dataset#foreachBatch returns only microbatch dataset and batchID where BatchID is a numeric number. micorbatch start time used across our data pipelines and final result. Could you add microbatch start timestamp to Dataset#foreachBatch method? Pseudo code : {code:java} val inputStream = sparkSession.readStream.format("rate").load inputStream .writeStream .trigger(Trigger.ProcessingTime(10 * 1000)) .foreachBatch { (ds: Dataset[Row], batchId: Long, batchTime: Long) => // batchTime is microbatch triggered/start timestamp // application logic. } .start() .awaitTermination() {code} Implementation approach when batchTime is trigger executor executed time: ( `currentTriggerStartTimestamp` can be used as well as batch time. Trigger executor time is source of microbatch and also can be easily added to query processor event as well) # Add trigger time to [TriggerExecutor|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L31] {code:java} trait TriggerExecutor { // added batchTime (Long) argument def execute(batchRunner: (MicroBatchExecutionContext, Long) => Boolean): Unit ... // (other methods) }{code} #
[jira] [Created] (SPARK-48418) Spark structured streaming: Add microbatch timestamp to foreachBatch method
Anil Dasari created SPARK-48418: --- Summary: Spark structured streaming: Add microbatch timestamp to foreachBatch method Key: SPARK-48418 URL: https://issues.apache.org/jira/browse/SPARK-48418 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.5.1 Reporter: Anil Dasari We are on Spark 3.x and using Spark dstream + kafka and planning to use structured streaming + Kafka. Differences in Dstream microbatch and structured streaming microbatch metadata is making migration difficult. Dstream#foreachRDD gives both microbatch RDD and start timestamp (in long). However, structured streaming Dataset#foreachBatch returns only microbatch dataset and batchID where BatchID is a numeric number. micorbatch start time used across our data pipelines and final result. Could you add microbatch start timestamp to Dataset#foreachBatch method? Pseudo code : {code:java} val inputStream = sparkSession.readStream.format("rate").load inputStream .writeStream .trigger(Trigger.ProcessingTime(10 * 1000)) .foreachBatch { (ds: Dataset[Row], batchId: Long, batchTime: Long) => // batchTime is microbatch triggered/start timestamp // application logic. } .start() .awaitTermination() {code} Implementation approach when batchTime is trigger executor executed time: ( `currentTriggerStartTimestamp` can be used as well as batch time. Trigger executor time is source of microbatch and also can be easily added to query processor event as well) # Add trigger time to [TriggerExecutor|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L31] {code:java} trait TriggerExecutor { // added batchTime (Long) argument def execute(batchRunner: (MicroBatchExecutionContext, Long) => Boolean): Unit ... // (other methods) }{code} # Update ProcessingTimeExecutor and other executors to pass trigger time. {code:java} override def execute(triggerHandler: (MicroBatchExecutionContext, Long) => Boolean): Unit = { while (true) { val triggerTimeMs = clock.getTimeMillis() val nextTriggerTimeMs = nextBatchTime(triggerTimeMs) // pass triggerTimeMs to runOneBatch which invokes triggerHandler and is used in MicroBatchExecution#runActivatedStream method. val terminated = !runOneBatch(triggerHandler, triggerTimeMs) ... } } {code} # Add argument executionTime (long) argument to MicroBatchExecution#excuteOneBatch method [here|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L330] # Pass execution time in [runBatch|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L380C11-L380C19] and [here|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L849] # Finally add following following method `foreachBatch` in `DataStreamWriter` and update existing `foreachBatch` methods for new argument. and also add it to query processor event. {code:java} def foreachBatch(function: (Dataset[T], Long, Long) => Unit): DataStreamWriter[T] = { this.source = SOURCE_NAME_FOREACH_BATCH if (function == null) throw new IllegalArgumentException("foreachBatch function cannot be null") this.foreachBatchWriter = function this }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44224) Table drop with Purge statement is not deleting the _temporary folder
Anil Dasari created SPARK-44224: --- Summary: Table drop with Purge statement is not deleting the _temporary folder Key: SPARK-44224 URL: https://issues.apache.org/jira/browse/SPARK-44224 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.3.1 Reporter: Anil Dasari When saveAsTable is failed for some reason and `_temporary` folder created is not deleted with Drop table with purge statement. On high level , out data process look like below. # Read data from external store using load # Drop the table with purge just in case table exist already # Write Dataframe to hive using dataframe.write.saveMode(Overwrite).saveAsTable("my_table") When Step 3 is failed for some reason and job is restarted because of yarm maxAttempts, Step is not deleting the _temporary folder created by Step 3 that is causing job to fail with below exception org.apache.spark.sql.AnalysisException: Can not create the managed table("my_table"). The associated location('') already exists. This is never been a case in Spark2 because of "spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation" config which is removed in Spark 3. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-44083) Spark streaming: Add max pending microbatches conf to skip scheduling new mircobatch
[ https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anil Dasari closed SPARK-44083. --- > Spark streaming: Add max pending microbatches conf to skip scheduling new > mircobatch > > > Key: SPARK-44083 > URL: https://issues.apache.org/jira/browse/SPARK-44083 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 3.4.0 >Reporter: Anil Dasari >Priority: Major > > In the case of uneven incoming rates and high scheduling delays, streaming > will continue to add microbatches to the eventloop and submit the job to the > job thread executor. Consequently, pending microbatches hold fewer offset > ranges in Spark streaming Kafka if the kafka lag is less than the configured > maximum per partition. > We rely on the third-party service to add additional metadata to incoming > records, and its response times remain constant regardless of microbatch > size. So, Small microbatches can increase latencies further. An RDD's > metadata is fetched during the transform phase in our case for various > reasons, which is executed when micorbatch is scheduled. Our RDD transform on > high level : > {code:java} > val dstreams = ... > dstreams.transform(rdd => > { > val uniqueItems = rdd.map(..).distinct.collect > val metadata = getMedatada(uniqueItems) > val rddWithMedatadata = rdd.map(...) // adds metadata > > rddWithMedatadata > }) > {code} > > Scheduling many small microbatches can be avoided by skipping new jobs when > there are sufficient pending jobs in the queue. > Proposed changes in _JobExecutor.scala_ on high level: > {code:java} > val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", > -1) > private def processEvent(event: JobGeneratorEvent): Unit = { > logDebug("Got event " + event) > event match { > case GenerateJobs(time) => > if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < > maxPendingJobs){ > generateJobs(time) > }else { > logWarning("Skipping JobGenerator at " + time) // TODO: add > pending times in queue to log. > } > > // other current cases > case ... > . > } > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-44083) Spark streaming: Add max pending microbatches conf to skip scheduling new mircobatch
[ https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anil Dasari resolved SPARK-44083. - Resolution: Won't Do > Spark streaming: Add max pending microbatches conf to skip scheduling new > mircobatch > > > Key: SPARK-44083 > URL: https://issues.apache.org/jira/browse/SPARK-44083 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 3.4.0 >Reporter: Anil Dasari >Priority: Major > > In the case of uneven incoming rates and high scheduling delays, streaming > will continue to add microbatches to the eventloop and submit the job to the > job thread executor. Consequently, pending microbatches hold fewer offset > ranges in Spark streaming Kafka if the kafka lag is less than the configured > maximum per partition. > We rely on the third-party service to add additional metadata to incoming > records, and its response times remain constant regardless of microbatch > size. So, Small microbatches can increase latencies further. An RDD's > metadata is fetched during the transform phase in our case for various > reasons, which is executed when micorbatch is scheduled. Our RDD transform on > high level : > {code:java} > val dstreams = ... > dstreams.transform(rdd => > { > val uniqueItems = rdd.map(..).distinct.collect > val metadata = getMedatada(uniqueItems) > val rddWithMedatadata = rdd.map(...) // adds metadata > > rddWithMedatadata > }) > {code} > > Scheduling many small microbatches can be avoided by skipping new jobs when > there are sufficient pending jobs in the queue. > Proposed changes in _JobExecutor.scala_ on high level: > {code:java} > val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", > -1) > private def processEvent(event: JobGeneratorEvent): Unit = { > logDebug("Got event " + event) > event match { > case GenerateJobs(time) => > if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < > maxPendingJobs){ > generateJobs(time) > }else { > logWarning("Skipping JobGenerator at " + time) // TODO: add > pending times in queue to log. > } > > // other current cases > case ... > . > } > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44083) Spark streaming: Add max pending microbatches conf to skip scheduling new mircobatch
[ https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17736937#comment-17736937 ] Anil Dasari commented on SPARK-44083: - [~kabhwan] Thanks for the details. i will close the Jira for now and will reopen if i see similar behavior in structured streaming. > Spark streaming: Add max pending microbatches conf to skip scheduling new > mircobatch > > > Key: SPARK-44083 > URL: https://issues.apache.org/jira/browse/SPARK-44083 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 3.4.0 >Reporter: Anil Dasari >Priority: Major > > In the case of uneven incoming rates and high scheduling delays, streaming > will continue to add microbatches to the eventloop and submit the job to the > job thread executor. Consequently, pending microbatches hold fewer offset > ranges in Spark streaming Kafka if the kafka lag is less than the configured > maximum per partition. > We rely on the third-party service to add additional metadata to incoming > records, and its response times remain constant regardless of microbatch > size. So, Small microbatches can increase latencies further. An RDD's > metadata is fetched during the transform phase in our case for various > reasons, which is executed when micorbatch is scheduled. Our RDD transform on > high level : > {code:java} > val dstreams = ... > dstreams.transform(rdd => > { > val uniqueItems = rdd.map(..).distinct.collect > val metadata = getMedatada(uniqueItems) > val rddWithMedatadata = rdd.map(...) // adds metadata > > rddWithMedatadata > }) > {code} > > Scheduling many small microbatches can be avoided by skipping new jobs when > there are sufficient pending jobs in the queue. > Proposed changes in _JobExecutor.scala_ on high level: > {code:java} > val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", > -1) > private def processEvent(event: JobGeneratorEvent): Unit = { > logDebug("Got event " + event) > event match { > case GenerateJobs(time) => > if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < > maxPendingJobs){ > generateJobs(time) > }else { > logWarning("Skipping JobGenerator at " + time) // TODO: add > pending times in queue to log. > } > > // other current cases > case ... > . > } > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44083) Spark streaming: Add max pending microbatches conf to skip scheduling new mircobatch
[ https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anil Dasari updated SPARK-44083: Description: In the case of uneven incoming rates and high scheduling delays, streaming will continue to add microbatches to the eventloop and submit the job to the job thread executor. Consequently, pending microbatches hold fewer offset ranges in Spark streaming Kafka if the kafka lag is less than the configured maximum per partition. We rely on the third-party service to add additional metadata to incoming records, and its response times remain constant regardless of microbatch size. So, Small microbatches can increase latencies further. An RDD's metadata is fetched during the transform phase in our case for various reasons, which is executed when micorbatch is scheduled. Our RDD transform on high level : {code:java} val dstreams = ... dstreams.transform(rdd => { val uniqueItems = rdd.map(..).distinct.collect val metadata = getMedatada(uniqueItems) val rddWithMedatadata = rdd.map(...) // adds metadata rddWithMedatadata }) {code} Scheduling many small microbatches can be avoided by skipping new jobs when there are sufficient pending jobs in the queue. Proposed changes in _JobExecutor.scala_ on high level: {code:java} val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", -1) private def processEvent(event: JobGeneratorEvent): Unit = { logDebug("Got event " + event) event match { case GenerateJobs(time) => if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < maxPendingJobs){ generateJobs(time) }else { logWarning("Skipping JobGenerator at " + time) // TODO: add pending times in queue to log. } // other current cases case ... . } } {code} was: In the case of uneven incoming rates and high scheduling delays, streaming will continue to add microbatches to the eventloop and submit the job to the job thread executor. Consequently, pending microbatches hold fewer offset ranges in Spark streaming Kafka if the kafka lag is less than the configured maximum per partition. We rely on the third-party service to add additional metadata to incoming records, and its response times remain constant regardless of microbatch size. An RDD's metadata is fetched during the transform phase in our case if various reasons, which is executed when micorbatch is scheduled. Our RDD transform on high level : {code:java} val dstreams = ... dstreams.transform(rdd => { val uniqueItems = rdd.map(..).distinct.collect val metadata = getMedatada(uniqueItems) val rddWithMedatadata = rdd.map(...) // adds metadata rddWithMedatadata }) {code} Scheduling small microbatches can be avoided by skipping new jobs when there are sufficient pending jobs in the queue. Proposed changes in _JobExecutor.scala_ on high level: {code:java} val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", -1) private def processEvent(event: JobGeneratorEvent): Unit = { logDebug("Got event " + event) event match { case GenerateJobs(time) => if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < maxPendingJobs){ generateJobs(time) }else { logWarning("Skipping JobGenerator at " + time) // TODO: add pending times in queue to log. } // other current cases case ... . } } {code} > Spark streaming: Add max pending microbatches conf to skip scheduling new > mircobatch > > > Key: SPARK-44083 > URL: https://issues.apache.org/jira/browse/SPARK-44083 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.4.0 >Reporter: Anil Dasari >Priority: Major > > In the case of uneven incoming rates and high scheduling delays, streaming > will continue to add microbatches to the eventloop and submit the job to the > job thread executor. Consequently, pending microbatches hold fewer offset > ranges in Spark streaming Kafka if the kafka lag is less than the configured > maximum per partition. > We rely on the third-party service to add additional metadata to incoming > records, and its response times remain constant regardless of microbatch > size. So, Small microbatches can increase latencies further. An RDD's > metadata is fetched during the transform phase in our case for various > reasons, which is executed when micorbatch is scheduled. Our RDD transform on > high level : > {code:java} > val dstreams = ... > dstreams.transform(rdd => > { > val uniqueItems = rdd.map(..).distinct.collect > val metadata = getMedatada(uniqueItems) > val rddWithMedatadata = rdd.map(...) // adds
[jira] [Updated] (SPARK-44083) Spark streaming: Add max pending microbatches conf to skip scheduling new mircobatch
[ https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anil Dasari updated SPARK-44083: Issue Type: Improvement (was: New Feature) > Spark streaming: Add max pending microbatches conf to skip scheduling new > mircobatch > > > Key: SPARK-44083 > URL: https://issues.apache.org/jira/browse/SPARK-44083 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.4.0 >Reporter: Anil Dasari >Priority: Major > > In the case of uneven incoming rates and high scheduling delays, streaming > will continue to add microbatches to the eventloop and submit the job to the > job thread executor. Consequently, pending microbatches hold fewer offset > ranges in Spark streaming Kafka if the kafka lag is less than the configured > maximum per partition. > We rely on the third-party service to add additional metadata to incoming > records, and its response times remain constant regardless of microbatch > size. So, Small microbatches can increase latencies further. An RDD's > metadata is fetched during the transform phase in our case for various > reasons, which is executed when micorbatch is scheduled. Our RDD transform on > high level : > {code:java} > val dstreams = ... > dstreams.transform(rdd => > { > val uniqueItems = rdd.map(..).distinct.collect > val metadata = getMedatada(uniqueItems) > val rddWithMedatadata = rdd.map(...) // adds metadata > > rddWithMedatadata > }) > {code} > > Scheduling many small microbatches can be avoided by skipping new jobs when > there are sufficient pending jobs in the queue. > Proposed changes in _JobExecutor.scala_ on high level: > {code:java} > val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", > -1) > private def processEvent(event: JobGeneratorEvent): Unit = { > logDebug("Got event " + event) > event match { > case GenerateJobs(time) => > if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < > maxPendingJobs){ > generateJobs(time) > }else { > logWarning("Skipping JobGenerator at " + time) // TODO: add > pending times in queue to log. > } > > // other current cases > case ... > . > } > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44083) Spark streaming: Add max pending microbatches conf to skip scheduling new mircobatch
[ https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anil Dasari updated SPARK-44083: Description: In the case of uneven incoming rates and high scheduling delays, streaming will continue to add microbatches to the eventloop and submit the job to the job thread executor. Consequently, pending microbatches hold fewer offset ranges in Spark streaming Kafka if the kafka lag is less than the configured maximum per partition. We rely on the third-party service to add additional metadata to incoming records, and its response times remain constant regardless of microbatch size. An RDD's metadata is fetched during the transform phase in our case if various reasons, which is executed when micorbatch is scheduled. Our RDD transform on high level : {code:java} val dstreams = ... dstreams.transform(rdd => { val uniqueItems = rdd.map(..).distinct.collect val metadata = getMedatada(uniqueItems) val rddWithMedatadata = rdd.map(...) // adds metadata rddWithMedatadata }) {code} Scheduling small microbatches can be avoided by skipping new jobs when there are sufficient pending jobs in the queue. Proposed changes in _JobExecutor.scala_ on high level: {code:java} val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", -1) private def processEvent(event: JobGeneratorEvent): Unit = { logDebug("Got event " + event) event match { case GenerateJobs(time) => if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < maxPendingJobs){ generateJobs(time) }else { logWarning("Skipping JobGenerator at " + time) // TODO: add pending times in queue to log. } // other current cases case ... . } } {code} was: In the case of uneven incoming rates and high scheduling delays, streaming will continue to add microbatches to the eventloop and submit the job to the job thread executor. Consequently, pending microbatches hold fewer offset ranges in Spark streaming Kafka if the kafka lag is less than the configured maximum per partition. We rely on the third-party service to add additional metadata to incoming records, and its response times remain constant regardless of microbatch size. An RDD's metadata is fetched during the transform phase in our case if various reasons, which is executed when micorbatch is scheduled. Our RDD transform on high level : {code:java} val dstreams = ... dstreams.transform(rdd => { val uniqueItems = rdd.map(..).distinct.collect val metadata = getMedatada(uniqueItems) val rddWithMedatadata = rdd.map(...) // adds metadata rddWithMedatadata }) {code} Scheduling small microbatches can be avoided by skipping new jobs when there are sufficient pending jobs in the queue. Proposed changes in _JobExecutor.scala_ on high level: {code:java} val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", -1) private def processEvent(event: JobGeneratorEvent): Unit = { logDebug("Got event " + event) event match { case GenerateJobs(time) => if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < maxPendingJobs){ generateJobs(time) }else { logWarning("Skipping JobGenerator at " + time) // adding pending times in queue. } // other current cases case ... . } } {code} > Spark streaming: Add max pending microbatches conf to skip scheduling new > mircobatch > > > Key: SPARK-44083 > URL: https://issues.apache.org/jira/browse/SPARK-44083 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.4.0 >Reporter: Anil Dasari >Priority: Major > > In the case of uneven incoming rates and high scheduling delays, streaming > will continue to add microbatches to the eventloop and submit the job to the > job thread executor. Consequently, pending microbatches hold fewer offset > ranges in Spark streaming Kafka if the kafka lag is less than the configured > maximum per partition. > We rely on the third-party service to add additional metadata to incoming > records, and its response times remain constant regardless of microbatch > size. An RDD's metadata is fetched during the transform phase in our case if > various reasons, which is executed when micorbatch is scheduled. Our RDD > transform on high level : > {code:java} > val dstreams = ... > dstreams.transform(rdd => > { > val uniqueItems = rdd.map(..).distinct.collect > val metadata = getMedatada(uniqueItems) > val rddWithMedatadata = rdd.map(...) // adds metadata > > rddWithMedatadata > }) > {code} > > Scheduling small microbatches can be avoided by skipping new jobs when
[jira] [Updated] (SPARK-44083) Spark streaming: Add max pending microbatches conf to avoid scheduling new mircobatch
[ https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anil Dasari updated SPARK-44083: Summary: Spark streaming: Add max pending microbatches conf to avoid scheduling new mircobatch (was: Spark streaming: Add max pending microbatches conf to avoid new pending mircobatch) > Spark streaming: Add max pending microbatches conf to avoid scheduling new > mircobatch > - > > Key: SPARK-44083 > URL: https://issues.apache.org/jira/browse/SPARK-44083 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.4.0 >Reporter: Anil Dasari >Priority: Major > > In the case of uneven incoming rates and high scheduling delays, streaming > will continue to add microbatches to the eventloop and submit the job to the > job thread executor. Consequently, pending microbatches hold fewer offset > ranges in Spark streaming Kafka if the kafka lag is less than the configured > maximum per partition. > We rely on the third-party service to add additional metadata to incoming > records, and its response times remain constant regardless of microbatch > size. An RDD's metadata is fetched during the transform phase in our case if > various reasons, which is executed when micorbatch is scheduled. Our RDD > transform on high level : > {code:java} > val dstreams = ... > dstreams.transform(rdd => > { > val uniqueItems = rdd.map(..).distinct.collect > val metadata = getMedatada(uniqueItems) > val rddWithMedatadata = rdd.map(...) // adds metadata > > rddWithMedatadata > }) > {code} > > Scheduling small microbatches can be avoided by skipping new jobs when there > are sufficient pending jobs in the queue. > > Proposed changes in _JobExecutor.scala_ on high level: > {code:java} > val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", > -1) > private def processEvent(event: JobGeneratorEvent): Unit = { > logDebug("Got event " + event) > event match { > case GenerateJobs(time) => > if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < > maxPendingJobs){ > generateJobs(time) > }else { > logWarning("Skipping JobGenerator at " + time) // adding pending > times in queue. > } > > // other current cases > case ... > . > } > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44083) Spark streaming: Add max pending microbatches conf to skip scheduling new mircobatch
[ https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anil Dasari updated SPARK-44083: Summary: Spark streaming: Add max pending microbatches conf to skip scheduling new mircobatch (was: Spark streaming: Add max pending microbatches conf to avoid scheduling new mircobatch) > Spark streaming: Add max pending microbatches conf to skip scheduling new > mircobatch > > > Key: SPARK-44083 > URL: https://issues.apache.org/jira/browse/SPARK-44083 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.4.0 >Reporter: Anil Dasari >Priority: Major > > In the case of uneven incoming rates and high scheduling delays, streaming > will continue to add microbatches to the eventloop and submit the job to the > job thread executor. Consequently, pending microbatches hold fewer offset > ranges in Spark streaming Kafka if the kafka lag is less than the configured > maximum per partition. > We rely on the third-party service to add additional metadata to incoming > records, and its response times remain constant regardless of microbatch > size. An RDD's metadata is fetched during the transform phase in our case if > various reasons, which is executed when micorbatch is scheduled. Our RDD > transform on high level : > {code:java} > val dstreams = ... > dstreams.transform(rdd => > { > val uniqueItems = rdd.map(..).distinct.collect > val metadata = getMedatada(uniqueItems) > val rddWithMedatadata = rdd.map(...) // adds metadata > > rddWithMedatadata > }) > {code} > > Scheduling small microbatches can be avoided by skipping new jobs when there > are sufficient pending jobs in the queue. > > Proposed changes in _JobExecutor.scala_ on high level: > {code:java} > val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", > -1) > private def processEvent(event: JobGeneratorEvent): Unit = { > logDebug("Got event " + event) > event match { > case GenerateJobs(time) => > if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < > maxPendingJobs){ > generateJobs(time) > }else { > logWarning("Skipping JobGenerator at " + time) // adding pending > times in queue. > } > > // other current cases > case ... > . > } > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44083) Spark streaming: Add max pending microbatches conf to avoid new pending mircobatch
[ https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anil Dasari updated SPARK-44083: Description: In the case of uneven incoming rates and high scheduling delays, streaming will continue to add microbatches to the eventloop and submit the job to the job thread executor. Consequently, pending microbatches hold fewer offset ranges in Spark streaming Kafka if the kafka lag is less than the configured maximum per partition. We rely on the third-party service to add additional metadata to incoming records, and its response times remain constant regardless of microbatch size. An RDD's metadata is fetched during the transform phase in our case if various reasons, which is executed when micorbatch is scheduled. Our RDD transform on high level : {code:java} val dstreams = ... dstreams.transform(rdd => { val uniqueItems = rdd.map(..).distinct.collect val metadata = getMedatada(uniqueItems) val rddWithMedatadata = rdd.map(...) // adds metadata rddWithMedatadata }) {code} Scheduling small microbatches can be avoided by skipping new jobs when there are sufficient pending jobs in the queue. Proposed changes in _JobExecutor.scala_ on high level: {code:java} val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", -1) private def processEvent(event: JobGeneratorEvent): Unit = { logDebug("Got event " + event) event match { case GenerateJobs(time) => if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < maxPendingJobs){ generateJobs(time) }else { logWarning("Skipping JobGenerator at " + time) // adding pending times in queue. } // other current cases case ... . } } {code} was: In the case of uneven incoming rates and high scheduling delays, streaming will continue to add microbatches to the eventloop and submit the job to the job thread executor. Consequently, pending microbatches hold fewer offset ranges in Spark streaming Kafka if the kafka lag is less than the configured maximum per partition. We rely on the third-party service to add additional metadata to incoming records, and its response times remain constant regardless of microbatch size. An RDD's metadata is fetched during the transform phase in our case if various reasons, which is executed when micorbatch is scheduled. Our RDD transform on high level : ``` va dstreams = ... dstreams.transform(rdd => { val uniqueItems = rdd.map(..).distinct.collect val metadata = getMedatada(uniqueItems) val rddWithMedatadata = rdd.map(...). // adds metadata rddWithMedatadata }) // other ``` Scheduling small microbatches can be avoided by skipping new jobs when there are sufficient pending jobs in the queue. Proposed changes on high level: ``` val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", -1) private def processEvent(event: JobGeneratorEvent): Unit = { logDebug("Got event " + event) event match { case GenerateJobs(time) => if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < maxPendingJobs) { generateJobs(time) } else { logWarning("*** Skipping JobGenerator at " + time) } // other current cases case ... . } } ``` > Spark streaming: Add max pending microbatches conf to avoid new pending > mircobatch > -- > > Key: SPARK-44083 > URL: https://issues.apache.org/jira/browse/SPARK-44083 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.4.0 >Reporter: Anil Dasari >Priority: Major > > In the case of uneven incoming rates and high scheduling delays, streaming > will continue to add microbatches to the eventloop and submit the job to the > job thread executor. Consequently, pending microbatches hold fewer offset > ranges in Spark streaming Kafka if the kafka lag is less than the configured > maximum per partition. > We rely on the third-party service to add additional metadata to incoming > records, and its response times remain constant regardless of microbatch > size. An RDD's metadata is fetched during the transform phase in our case if > various reasons, which is executed when micorbatch is scheduled. Our RDD > transform on high level : > {code:java} > val dstreams = ... > dstreams.transform(rdd => > { > val uniqueItems = rdd.map(..).distinct.collect > val metadata = getMedatada(uniqueItems) > val rddWithMedatadata = rdd.map(...) // adds metadata > > rddWithMedatadata > }) > {code} > > Scheduling small microbatches can be avoided by skipping new jobs when there > are sufficient pending jobs in the queue. > > Proposed changes
[jira] [Created] (SPARK-44083) Spark streaming: Add max pending microbatches conf to avoid new pending mircobatch
Anil Dasari created SPARK-44083: --- Summary: Spark streaming: Add max pending microbatches conf to avoid new pending mircobatch Key: SPARK-44083 URL: https://issues.apache.org/jira/browse/SPARK-44083 Project: Spark Issue Type: New Feature Components: Structured Streaming Affects Versions: 3.4.0 Reporter: Anil Dasari In the case of uneven incoming rates and high scheduling delays, streaming will continue to add microbatches to the eventloop and submit the job to the job thread executor. Consequently, pending microbatches hold fewer offset ranges in Spark streaming Kafka if the kafka lag is less than the configured maximum per partition. We rely on the third-party service to add additional metadata to incoming records, and its response times remain constant regardless of microbatch size. An RDD's metadata is fetched during the transform phase in our case if various reasons, which is executed when micorbatch is scheduled. Our RDD transform on high level : ``` va dstreams = ... dstreams.transform(rdd => { val uniqueItems = rdd.map(..).distinct.collect val metadata = getMedatada(uniqueItems) val rddWithMedatadata = rdd.map(...). // adds metadata rddWithMedatadata }) // other ``` Scheduling small microbatches can be avoided by skipping new jobs when there are sufficient pending jobs in the queue. Proposed changes on high level: ``` val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", -1) private def processEvent(event: JobGeneratorEvent): Unit = { logDebug("Got event " + event) event match { case GenerateJobs(time) => if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < maxPendingJobs) { generateJobs(time) } else { logWarning("*** Skipping JobGenerator at " + time) } // other current cases case ... . } } ``` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-40507) Spark creates an optional columns in hive table for fields that are not null
[ https://issues.apache.org/jira/browse/SPARK-40507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17613787#comment-17613787 ] Anil Dasari edited comment on SPARK-40507 at 10/6/22 8:48 PM: -- does anyone have thoughts on this ? was (Author: JIRAUSER283879): does anyone have an thoughts on this ? > Spark creates an optional columns in hive table for fields that are not null > > > Key: SPARK-40507 > URL: https://issues.apache.org/jira/browse/SPARK-40507 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Anil Dasari >Priority: Major > > Dataframe saveAsTable sets all columns as optional/nullable while creating > the table here > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L531] > (`outputColumns.toStructType.asNullable`) > This makes source parquet schema and hive table schema doesn't match and is > problematic when large dataframe(s) process uses hive as temporary storage to > avoid the memory pressure. > Hive 3.x supports non null constraints on table columns. Please add support > for non null constraints on Spark sql hive table. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-40507) Spark creates an optional columns in hive table for fields that are not null
[ https://issues.apache.org/jira/browse/SPARK-40507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17613787#comment-17613787 ] Anil Dasari commented on SPARK-40507: - does anyone have an thoughts on this ? > Spark creates an optional columns in hive table for fields that are not null > > > Key: SPARK-40507 > URL: https://issues.apache.org/jira/browse/SPARK-40507 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Anil Dasari >Priority: Major > > Dataframe saveAsTable sets all columns as optional/nullable while creating > the table here > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L531] > (`outputColumns.toStructType.asNullable`) > This makes source parquet schema and hive table schema doesn't match and is > problematic when large dataframe(s) process uses hive as temporary storage to > avoid the memory pressure. > Hive 3.x supports non null constraints on table columns. Please add support > for non null constraints on Spark sql hive table. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40507) Spark creates an optional columns in hive table for fields that are not null
[ https://issues.apache.org/jira/browse/SPARK-40507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anil Dasari updated SPARK-40507: Description: Dataframe saveAsTable sets all columns as optional/nullable while creating the table here [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L531] (`outputColumns.toStructType.asNullable`) This makes source parquet schema and hive table schema doesn't match and is problematic when large dataframe(s) process uses hive as temporary storage to avoid the memory pressure. Hive 3.x supports non null constraints on table columns. Please add support for non null constraints on Spark sql hive table. was: Dataframe saveAsTable sets all columns as optional/nullable while creating the table here [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L531] (`outputColumns.toStructType.asNullable`) This makes source parquet schema and hive table schema doesn't match and is problematic when large dataframe(s) process uses hive as temporary storage to avoid the memory pressure. Hive 3.x supports non null constraints on table columns. Please add support non null constraints on Spark sql hive table. > Spark creates an optional columns in hive table for fields that are not null > > > Key: SPARK-40507 > URL: https://issues.apache.org/jira/browse/SPARK-40507 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Anil Dasari >Priority: Major > > Dataframe saveAsTable sets all columns as optional/nullable while creating > the table here > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L531] > (`outputColumns.toStructType.asNullable`) > This makes source parquet schema and hive table schema doesn't match and is > problematic when large dataframe(s) process uses hive as temporary storage to > avoid the memory pressure. > Hive 3.x supports non null constraints on table columns. Please add support > for non null constraints on Spark sql hive table. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40507) Spark creates an optional columns in hive table for fields that are not null
Anil Dasari created SPARK-40507: --- Summary: Spark creates an optional columns in hive table for fields that are not null Key: SPARK-40507 URL: https://issues.apache.org/jira/browse/SPARK-40507 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.3.0 Reporter: Anil Dasari Dataframe saveAsTable sets all columns as optional/nullable while creating the table here [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L531] (`outputColumns.toStructType.asNullable`) This makes source parquet schema and hive table schema doesn't match and is problematic when large dataframe(s) process uses hive as temporary storage to avoid the memory pressure. Hive 3.x supports non null constraints on table columns. Please add support non null constraints on Spark sql hive table. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org