[jira] [Updated] (SPARK-38679) Expose the number partitions in a stage to TaskContext

2022-03-28 Thread Venki Korukanti (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Venki Korukanti updated SPARK-38679:

Description: 
Add a new api to expose total partition count in the stage belonging to the 
task in TaskContext, so that the task knows what fraction of the computation is 
doing.

With this extra information, users can also generate 32bit unique int ids as 
below rather than using `monotonically_increasing_id` which generates 64bit 
long ids.

 
{code:java}
   rdd.mapPartitions { rowsIter =>
        val partitionId = TaskContext.get().partitionId()
        val numPartitions = TaskContext.get().numPartitions()
        var i = 0
        rowsIter.map { row =>
          val rowId = partitionId + i * numPartitions
          i += 1
          (rowId, row)
       }
  }{code}
 

  was:
Add a new api to expose total partition count in the stage belonging to the 
task in TaskContext. so that the task knows what fraction of the computation is 
doing.

With this extra information, users can also generate 32bit unique int ids as 
below rather than using `monotonically_increasing_id` which generates 64bit 
long ids.

 
{code:java}
   rdd.mapPartitions { rowsIter =>
        val partitionId = TaskContext.get().partitionId()
        val numPartitions = TaskContext.get().numPartitions()
        var i = 0
        rowsIter.map { row =>
          val rowId = partitionId + i * numPartitions
          i += 1
          (rowId, row)
       }
  }{code}
 


> Expose the number partitions in a stage to TaskContext
> --
>
> Key: SPARK-38679
> URL: https://issues.apache.org/jira/browse/SPARK-38679
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.2.1
>Reporter: Venki Korukanti
>Priority: Major
>
> Add a new api to expose total partition count in the stage belonging to the 
> task in TaskContext, so that the task knows what fraction of the computation 
> is doing.
> With this extra information, users can also generate 32bit unique int ids as 
> below rather than using `monotonically_increasing_id` which generates 64bit 
> long ids.
>  
> {code:java}
>    rdd.mapPartitions { rowsIter =>
>         val partitionId = TaskContext.get().partitionId()
>         val numPartitions = TaskContext.get().numPartitions()
>         var i = 0
>         rowsIter.map { row =>
>           val rowId = partitionId + i * numPartitions
>           i += 1
>           (rowId, row)
>        }
>   }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38679) Expose the number partitions in a stage to TaskContext

2022-03-28 Thread Venki Korukanti (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Venki Korukanti updated SPARK-38679:

Description: 
Add a new api to expose total partition count in the stage belonging to the 
task in TaskContext. so that the task knows what fraction of the computation is 
doing.

With this extra information, users can also generate 32bit unique int ids as 
below rather than using `monotonically_increasing_id` which generates 64bit 
long ids.

 
{code:java}
   rdd.mapPartitions { rowsIter =>
        val partitionId = TaskContext.get().partitionId()
        val numPartitions = TaskContext.get().numPartitions()
        var i = 0
        rowsIter.map { row =>
          val rowId = partitionId + i * numPartitions
          i += 1
          (rowId, row)
       }
  }{code}
 

  was:
Add a new api to expose total partition count in the stage belonging to the 
task in TaskContext. so that the task knows what fraction of the computation is 
doing.

With this extra information, users can also generate 32bit unique int ids as 
below rather than using `monotonically_increasing_id` which generates 64bit 
long ids.

 

```

{{   rdd.mapPartitions { rowsIter =>}}
{{        val partitionId = TaskContext.get().partitionId()}}
{{        val numPartitions = TaskContext.get().numPartitions()}}
{{        var i = 0}}
{{        rowsIter.map { row =>}}

{{          val rowId = partitionId + i * numPartitions}}

{{          i += 1}}

{{          (rowId, row)}}

                }

      }

```


> Expose the number partitions in a stage to TaskContext
> --
>
> Key: SPARK-38679
> URL: https://issues.apache.org/jira/browse/SPARK-38679
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.2.1
>Reporter: Venki Korukanti
>Priority: Major
>
> Add a new api to expose total partition count in the stage belonging to the 
> task in TaskContext. so that the task knows what fraction of the computation 
> is doing.
> With this extra information, users can also generate 32bit unique int ids as 
> below rather than using `monotonically_increasing_id` which generates 64bit 
> long ids.
>  
> {code:java}
>    rdd.mapPartitions { rowsIter =>
>         val partitionId = TaskContext.get().partitionId()
>         val numPartitions = TaskContext.get().numPartitions()
>         var i = 0
>         rowsIter.map { row =>
>           val rowId = partitionId + i * numPartitions
>           i += 1
>           (rowId, row)
>        }
>   }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38679) Expose the number partitions in a stage to TaskContext

2022-03-28 Thread Venki Korukanti (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Venki Korukanti updated SPARK-38679:

Description: 
Add a new api to expose total partition count in the stage belonging to the 
task in TaskContext. so that the task knows what fraction of the computation is 
doing.

With this extra information, users can also generate 32bit unique int ids as 
below rather than using `monotonically_increasing_id` which generates 64bit 
long ids.

 

```

{{   rdd.mapPartitions { rowsIter =>}}
{{        val partitionId = TaskContext.get().partitionId()}}
{{        val numPartitions = TaskContext.get().numPartitions()}}
{{        var i = 0}}
{{        rowsIter.map { row =>}}

{{          val rowId = partitionId + i * numPartitions}}

{{          i += 1}}

{{          (rowId, row)}}

                }

      }

```

  was:
Add a new api to expose total partition count in a task. so that the task knows 
what fraction of the computation is doing.

With this extra information, users can also generate 32bit unique int ids as 
below rather than using `monotonically_increasing_id` which generates 64bit 
long ids.

 

{{   rdd.mapPartitions { rowsIter =>}}
{{        val partitionId = TaskContext.get().partitionId()}}
{{        val numPartitions = TaskContext.get().numPartitions()}}
{{        var i = 0}}
{{        rowsIter.map { row =>}}

{{          val rowId = partitionId + i * numPartitions}}

{{          i += 1}}

{{          (rowId, row)}}

{{        }}}

{{    }}}


> Expose the number partitions in a stage to TaskContext
> --
>
> Key: SPARK-38679
> URL: https://issues.apache.org/jira/browse/SPARK-38679
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.2.1
>Reporter: Venki Korukanti
>Priority: Major
>
> Add a new api to expose total partition count in the stage belonging to the 
> task in TaskContext. so that the task knows what fraction of the computation 
> is doing.
> With this extra information, users can also generate 32bit unique int ids as 
> below rather than using `monotonically_increasing_id` which generates 64bit 
> long ids.
>  
> ```
> {{   rdd.mapPartitions { rowsIter =>}}
> {{        val partitionId = TaskContext.get().partitionId()}}
> {{        val numPartitions = TaskContext.get().numPartitions()}}
> {{        var i = 0}}
> {{        rowsIter.map { row =>}}
> {{          val rowId = partitionId + i * numPartitions}}
> {{          i += 1}}
> {{          (rowId, row)}}
>                 }
>       }
> ```



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38679) Expose the number partitions in a stage to TaskContext

2022-03-28 Thread Venki Korukanti (Jira)
Venki Korukanti created SPARK-38679:
---

 Summary: Expose the number partitions in a stage to TaskContext
 Key: SPARK-38679
 URL: https://issues.apache.org/jira/browse/SPARK-38679
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.2.1
Reporter: Venki Korukanti


Add a new api to expose total partition count in a task. so that the task knows 
what fraction of the computation is doing.

With this extra information, users can also generate 32bit unique int ids as 
below rather than using `monotonically_increasing_id` which generates 64bit 
long ids.

 

{{   rdd.mapPartitions { rowsIter =>}}
{{        val partitionId = TaskContext.get().partitionId()}}
{{        val numPartitions = TaskContext.get().numPartitions()}}
{{        var i = 0}}
{{        rowsIter.map { row =>}}

{{          val rowId = partitionId + i * numPartitions}}

{{          i += 1}}

{{          (rowId, row)}}

{{        }}}

{{    }}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36236) RocksDB state store: Add additional metrics for better observability into state store operations

2021-07-20 Thread Venki Korukanti (Jira)
Venki Korukanti created SPARK-36236:
---

 Summary: RocksDB state store: Add additional metrics for better 
observability into state store operations
 Key: SPARK-36236
 URL: https://issues.apache.org/jira/browse/SPARK-36236
 Project: Spark
  Issue Type: Sub-task
  Components: Structured Streaming
Affects Versions: 3.1.2
Reporter: Venki Korukanti


Proposing adding following new metrics to {{customMetrics}} under the 
{{stateOperators}} in {{StreamingQueryProgress}} event These metrics help have 
better visibility into the RocksDB based state store in streaming jobs.
 * {{rocksdbGetCount}} number of get calls to the DB (doesn’t include Gets from 
WriteBatch - in memory batch used for staging writes) 
 * {{rocksdbPutCount}} number of put calls to the DB (doesn’t include Puts to 
WriteBatch - in memory batch used for staging writes)
 * {{rocksdbTotalBytesReadByGet/rocksdbTotalBytesWrittenByPut}}: Number of 
uncompressed bytes read/written by get/put operations
 * {{rocksdbReadBlockCacheHitCount/rocksdbReadBlockCacheMissCount}} indicates 
how much of the block cache in RocksDB is useful or not and avoiding local disk 
reads
 * {{rocksdbTotalBytesReadByCompaction/rocksdbTotalBytesWrittenByCompaction}}: 
How many bytes the compaction process read from disk and written to disk. 
 * {{rocksdbTotalCompactionTime}}: Time (in ns) took for compactions (both 
background and the optional compaction initiated during the commit)
 * {{rocksdbWriterStallDuration}} Time (in ns) the writer has stalled due to a 
background compaction or flushing of the immutable memtables to disk. 
 * {{rocksdbTotalBytesReadThroughIterator}} Some of the stateful operations 
(such as timeout processing in FlatMapGroupsWithState and watermarking) 
requires reading entire data in DB through iterator. This metric tells the 
total size of uncompressed data read using the iterator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35896) [SS] Include more granular metrics for stateful operators in StreamingQueryProgress

2021-06-25 Thread Venki Korukanti (Jira)
Venki Korukanti created SPARK-35896:
---

 Summary: [SS] Include more granular metrics for stateful operators 
in StreamingQueryProgress
 Key: SPARK-35896
 URL: https://issues.apache.org/jira/browse/SPARK-35896
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 3.1.2
Reporter: Venki Korukanti


Currently the streaming progress is missing a few important stateful operator 
metrics in {{StateOperatorProgress}}. Each stateful operator consists of 
multiple steps. Ex: {{flatMapGroupsWithState}} has two major steps: 1) 
processing the input and 2) timeout processing to remove entries from the state 
which have expired. The main motivation is to track down the time it took for 
each individual step (such as timeout processing, watermark processing etc) and 
how much data is processed to pinpoint the bottlenecks and compare for 
reasoning why some microbatches are slow compared to others in the same job.

Below are the final metrics common to all stateful operators (the one in 
_*bold-italic*_ are proposed new). These metrics are in 
{{StateOperatorProgress}} which is part of {{StreamingQueryProgress}}.
 * _*operatorName*_ - State operator name. Can help us identify any operator 
specific slowness and state store usage patterns. Ex. "dedupe" (derived using 
{{StateStoreWriter.shortName}})
 * _numRowsTotal_ - number of rows in the state store across all tasks in a 
stage where the operator has executed.
 * _numRowsUpdated_ - number of rows added to or update in the store
 * _*allUpdatesTimeMs*_ - time taken to add new rows or update existing state 
store rows across all tasks in a stage where the operator has executed.
 * _*numRowsRemoved*_ - number of rows deleted from state store as part of the 
state cleanup mechanism across all tasks in a stage where the operator has 
executed. This number helps measure the state store deletions and impact on 
checkpoint commit and other latencies.
 * _*allRemovalsTimeMs*_ - time taken to remove the rows from the state store 
as part of state (also includes the iterating through the entire state store to 
find which rows to delete) across all tasks in a stage where the operator has 
executed. If we see jobs spending significant time here, it may justify a 
better layout in the state store to read only the required rows than the entire 
state store that is read currently.
 * _*commitTimeMs*_ - time taken to commit the state store changes to external 
storage for checkpointing. This is cumulative across all tasks in a stage where 
this operator has executed.
 * _*numShufflePartitions*_ - number of shuffle partitions this state operator 
is part of. Currently the metrics like times are aggregated across all tasks in 
a stage where the operator has executed. Having the number shuffle partitions 
(corresponds to number of tasks) helps us find the average task contribution to 
the metric.
 * _*numStateStores*_ - number of state stores in the operator across all tasks 
in the stage. Some stateful operators have more than one state store (eg. 
stream-stream join). Tracking this number helps us find correlations between 
state stores instances and microbatch latency.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35880) [SS] Track the number of duplicates dropped in streaming dedupe operator

2021-06-24 Thread Venki Korukanti (Jira)
Venki Korukanti created SPARK-35880:
---

 Summary: [SS] Track the number of duplicates dropped in streaming 
dedupe operator
 Key: SPARK-35880
 URL: https://issues.apache.org/jira/browse/SPARK-35880
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 3.1.2
Reporter: Venki Korukanti


Currently there is no way to find how many duplicates in the input are dropped. 
Having this metric will help track down incorrect results issues.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35799) Fix the allUpdatesTimeMs metric measuring in FlatMapGroupsWithStateExec

2021-06-17 Thread Venki Korukanti (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Venki Korukanti updated SPARK-35799:

Description: 
Metric {{allUpdatesTimeMs}} meant to capture the start to end walltime of the 
operator {{FlatMapGroupsWithStateExec}}, but currently it just 
[captures|https://github.com/apache/spark/blob/79362c4efcb6bd4b575438330a14a6191cca5e4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L121]
 the iterator creation time. 

Fix it to measure similar to how other stateful operators measure. Example one 
[here|https://github.com/apache/spark/blob/79362c4efcb6bd4b575438330a14a6191cca5e4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L406].
 This measurement is not perfect due to the nature of the lazy iterator and 
also includes the time the consumer operator spent in processing the current 
operator output, but it should give a good signal when comparing the metric in 
one microbatch to the metric in another microbatch.

  was:
Metric `allUpdatesTimeMs` meant to capture the start to end walltime of the 
operator `FlatMapGroupsWithStateExec`, but currently it just 
[captures|https://github.com/apache/spark/blob/79362c4efcb6bd4b575438330a14a6191cca5e4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L121]
 the iterator creation time. 

Fix it to measure similar to how other stateful operators measure. Example one 
[here|https://github.com/apache/spark/blob/79362c4efcb6bd4b575438330a14a6191cca5e4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L406].
 This measurement is not perfect due to the nature of the lazy iterator and 
also includes the time the consumer operator spent in processing the current 
operator output, but it should give a good signal when comparing the metric in 
one microbatch to the metric in another microbatch.


> Fix the allUpdatesTimeMs metric measuring in FlatMapGroupsWithStateExec
> ---
>
> Key: SPARK-35799
> URL: https://issues.apache.org/jira/browse/SPARK-35799
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.1.2
>Reporter: Venki Korukanti
>Priority: Minor
>
> Metric {{allUpdatesTimeMs}} meant to capture the start to end walltime of the 
> operator {{FlatMapGroupsWithStateExec}}, but currently it just 
> [captures|https://github.com/apache/spark/blob/79362c4efcb6bd4b575438330a14a6191cca5e4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L121]
>  the iterator creation time. 
> Fix it to measure similar to how other stateful operators measure. Example 
> one 
> [here|https://github.com/apache/spark/blob/79362c4efcb6bd4b575438330a14a6191cca5e4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L406].
>  This measurement is not perfect due to the nature of the lazy iterator and 
> also includes the time the consumer operator spent in processing the current 
> operator output, but it should give a good signal when comparing the metric 
> in one microbatch to the metric in another microbatch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35799) Fix the allUpdatesTimeMs metric measuring in FlatMapGroupsWithStateExec

2021-06-17 Thread Venki Korukanti (Jira)
Venki Korukanti created SPARK-35799:
---

 Summary: Fix the allUpdatesTimeMs metric measuring in 
FlatMapGroupsWithStateExec
 Key: SPARK-35799
 URL: https://issues.apache.org/jira/browse/SPARK-35799
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 3.1.2
Reporter: Venki Korukanti


Metric `allUpdatesTimeMs` meant to capture the start to end walltime of the 
operator `FlatMapGroupsWithStateExec`, but currently it just 
[captures|https://github.com/apache/spark/blob/79362c4efcb6bd4b575438330a14a6191cca5e4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L121]
 the iterator creation time. 

Fix it to measure similar to how other stateful operators measure. Example one 
[here|https://github.com/apache/spark/blob/79362c4efcb6bd4b575438330a14a6191cca5e4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L406].
 This measurement is not perfect due to the nature of the lazy iterator and 
also includes the time the consumer operator spent in processing the current 
operator output, but it should give a good signal when comparing the metric in 
one microbatch to the metric in another microbatch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35763) Minor refactor of StateStoreCustomMetric

2021-06-14 Thread Venki Korukanti (Jira)
Venki Korukanti created SPARK-35763:
---

 Summary: Minor refactor of StateStoreCustomMetric
 Key: SPARK-35763
 URL: https://issues.apache.org/jira/browse/SPARK-35763
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 3.1.2
Reporter: Venki Korukanti


Code in 
{{[SymmetricHashJoinStateManager|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L321]}}
 relies on the subclass implementations of 
[StateStoreCustomMetric|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L187].

If a new subclass of {{StateStoreCustomMetric}} is added, it requires code 
changes to {{SymmetricHashJoinStateManager}} and we may miss the update if 
there is no existing test coverage.

To fix this, add new method to trait {{withNewDesc(desc : String)}} to 
{{StateStoreCustomMetric}} and use it in {{SymmetricHashJoinStateManager}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org